《Redis Cookbook》¶
Warning
本项目正在进行中,如果有任何条目是空白或者不完全,都是可以理解的。
目录(以第一个字的中文拼音排序):
h¶
缓存(cache)¶
应用
缓存用于保存需要大量计算的操作结果,或者需要快速访问的数据。
比如一个数量庞大的排序,一个耗时的搜索,网站主页内容,等等。
定义
最简单的缓存实现一般只有两个操作,一个是set
,另一个是get
。
更新缓存有两种常用策略:
- 手动更新
- 自动过期
通常我们可以在数据更新的时候手动更新缓存,比如每当有新数据插入的时候,我们就执行一次set
,让新缓存覆盖旧缓存。
另一方面,我们可以用expire
操作给缓存设定一个过期时间,当缓存过期时它被自动删除,然后在每次查找数据的时候查看缓存是否存在,如果缓存不存在则重建缓存,并为缓存设定过期时间,然后返回结果;如果缓存存在,则直接取出缓存里的结果并返回。
缓存的删除操作delete
通常是可有可无的,其一是因为你可以直接用set
覆盖旧缓存,其二是可以让缓存自动过期,所以通常不需要删除缓存,只是偶尔在调试的时候会用上。
实现
缓存可以用Redis的string_struct或hash_struct来实现。
字符串实现的优点是可以为每个缓存分别设置过期时间,缺点是比哈希表实现占用更多的空间。
而在哈希表实现中,每个哈希表只能共享同一个过期时间(也即是,放在同一个哈希表中的所有缓存会同时过期)。但是你可以利用这一特点为缓存分类,比如你可以将所有排序操作的缓存放到名为sort_cache
的哈希表中,而将所有搜索操作的缓存放到名为search_cache
的哈希表中,然后分别为sort_cache
和search_cache
设置不同的过期时间。
并且哈希表实现比字符串实现更节省空间。
See also
我们通常不对缓存的数量进行限制,如果你需要限制缓存的数量(比如只允许最多100个缓存),请参考日志(log)。
如果你需要实现一些复杂的缓存算法,比如Most Recently Used(MRU)或Least Recently Used(LRU)请使用sorted_set_struct。
关于哈希表比字符串更节约空间的讨论,请参考Redis官方的Memory optimization文档。
字符串实现¶
# file: ./h/cache/string_implement.py
from redis import Redis
def set(name, value, ttl=None, client=Redis()):
if ttl:
client.setex(name, value, ttl)
else:
client.set(name, value)
def get(name, client=Redis()):
return client.get(name)
def delete(name, client=Redis()):
client.delete(name)
# test:
if __name__ == "__main__":
from time import sleep
key = 'phone'
value = '10086'
expire_time = 3
set(key, value)
assert get(key) == value
delete(key)
assert get(key) == None
set(key, value, expire_time)
assert get(key) == value
sleep(expire_time * 2)
assert get(key) == None
哈希表实现¶
哈希表实现比字符串实现提供更多功能,因此也相对复杂一些。
我们用category
参数给缓存分类,并增加expire
操作来设置整个哈希表的过期时间,ttl
函数返回哈希表的剩余生存时间,size
则返回给定类型的分类缓存的数量。
# file: ./h/cache/hash_implement.py
from redis import Redis
def set(category, name, value, client=Redis()):
client.hset(category, name, value)
def get(category, name, client=Redis()):
return client.hget(category, name)
def delete(category, name, client=Redis()):
client.hdel(category, name)
def expire(category, ttl, client=Redis()):
client.expire(category, ttl)
def ttl(category, client=Redis()):
return client.ttl(category)
def size(category, client=Redis()):
return client.hlen(category)
# test:
if __name__ == "__main__":
from time import sleep
category = 'greet'
key = 'morning'
value = 'good morning!'
expire_time = 3
set(category, key, value)
assert get(category, key) == value
assert size(category) == 1
delete(category, key)
assert get(category, key) == None
assert size(category) == 0
set(category, key,value)
expire(category, expire_time)
assert ttl(category) != None
sleep(expire_time * 2)
assert get(category, key) == None
实例:用Python装饰器为函数加上缓存¶
Python中有一个方便好用的特性,就是它的装饰器(decorator)机制,可以无缝地为特定的函数加上新的功能。
我们可以将装饰器、函数和我们的缓存实现三者集合起来,为指定的函数提供方便且通用的缓存机制。
比如现在有一个函数search
,这个搜索非常耗时,所以我们想给它加上个缓存,我们利用装饰器cache
,为search
加上缓存机制。
@cache
def search(key):
pass
这样,search
函数就会在每次执行时查找缓存,如果缓存不命中,就执行一次搜索,将结果保存到缓存并返回。如果搜索命中,则直接返回缓存作为结果。
以下就是cache
装饰器的实现方法:
# file: ./h/cache/example.py
from functools import wraps
from string_implement import set, get
def make_unique_id(function, args, kwargs):
return function.__name__ + repr(args) + repr(kwargs)
def cache(function):
@wraps(function)
def _(*args, **kwargs):
id = make_unique_id(function, *args, **kwargs)
cache = get(id)
if cache:
return cache
else:
result = function(*args, **kwargs)
set(result)
return result
return _
J¶
计数器(counter)¶
应用
计数器一般用于访问量、下载量、投票数等各种计数用途,和自增唯一id(autoincrementing unique identifier)不同的是,计数器的值不但可以被增加,还可以被清零(比如发现有作弊行为),或者被减少(比如部分计数无效),所以计数器生成的值也不是唯一的。
定义
一个计数器,至少应该拥有以下四个操作:
- 增加数值
- 减少数值
- 清零
- 查看当前数值
实现
计数器可以用以下两种方式实现:
- String类函数,INCR,INCRBY,DECR,DECRBY,还有GET和SET。
- Hash类函数,HINCRBY,HSET和HGET。
String实现¶
# coding:utf-8
# file: ./j/counter/string_implement.py
from redis import Redis
INITIAL_VALUE = 0
class Counter:
def __init__(self, name, client=Redis()):
self.name = name
self.client = client
def incr(self, increment=1):
# redis-py 用 incr 代替 incrby,所以可以指定增量
value = self.client.incr(self.name, increment)
return int(value)
def decr(self, decrement=1):
# redis-py 用 decr 代替 decrby,所以可以指定减量
value = self.client.decr(self.name, decrement)
return int(value)
def set(self, value):
self.client.set(self.name, value)
def get(self):
value = self.client.get(self.name)
return INITIAL_VALUE if value is None else int(value)
def reset(self):
self.set(INITIAL_VALUE)
Hash实现¶
Hash实现和String实现稍有不同,Hash实现还需提供一个key
作Hash的键。另外,Hash只有HINCRBY而没有HDECRBY命令,但是我们可以通过代码0-decrement
将负数作为“增量”,传入HINCRBY命令,来达到做减法的效果。
# file: ./j/counter/hash_implement.py
from redis import Redis
INITIALI_VALUE = 0
KEY = 'counter'
class Counter:
def __init__(self, field, client=Redis(), key=KEY):
self.key = key
self.field = field
self.client = client
def incr(self, increment=1):
value = self.client.hincrby(self.key, self.field, increment)
return int(value)
def decr(self, decrement=1):
value = self.client.hincrby(self.key, self.field, 0-decrement)
return int(value)
def set(self, value):
self.client.hset(self.key, self.field, value)
def get(self):
value = self.client.hget(self.key, self.field)
return INITIALI_VALUE if value is None else int(value)
def reset(self):
self.set(INITIALI_VALUE)
r¶
日志(log)¶
应用
适用于各种单纯记录用途,要求按写入先后排序,但不必严格按时间排序的文本,比如事件日志、服务器日志、博客文章,等等。
See also
如果你需要按时间作参数排序或处理你的条目,可以使用时间线(timeline)。
定义
一个日志对象通常需要写入、读出以及截断操作。
写入操作append
只是单纯将条目追加到日志后面,读出操作read
可以指定一个区间(比如最新100条,最旧200条等),或是用一个迭代器逐条读取。
length
操作查看日志的条目数量。
截断功能keep
接受一个数值,用来保证日志文件不会太大,比如只保留最新100条,只保留最新500条,等等。有时候还需要清空整个日志,所以clear
方法也是有必要的。
条目的位置一般来说是无关紧要的,比如你通常不需要知道第101个条目的内容,所以不需要下标索引类(index)方法。
实现
日志可以用Redis的list_struct来实现,LPUSH或RPUSH实现写入,LRANGE实现读出,LTRIM负责截断。
因为list_struct结构的特性,新条目默认总是被追加到末尾(最左边或最右边),所以你不需要向日志提供时间值作参数,尽管时间值常常是日志内容的一部分。
在这个例子中,我们将最新的日志条目追加到列表最左边(使用LPUSH),而旧的日志条目则被“挤”到右边的。
# coding:utf-8
# file: ./r/log/list_implement.py
from redis import Redis
# Redis列表的边界下标
LEFTMOST = 0
RIGHTMOST = -1
class Log:
def __init__(self, name, client=Redis()):
self.name = name
self.client = client
def append(self, content):
return self.client.lpush(self.name, content)
def read(self, start=LEFTMOST, stop=RIGHTMOST):
return self.client.lrange(self.name, start, stop)
def length(self):
return self.client.llen(self.name)
def clear(self):
# 因为del是Python的保留字
# 所以redis-py用delete代替del命令
self.client.delete(self.name)
def keep(self, size):
# 只保留log[0:size-1]范围内的条目
self.client.ltrim(self.name, LEFTMOST, size-1)
s¶
时间线(timeline)¶
应用
在有些应用程序中,数据通常带有一个时间值,程序以时间为单位操作数据(可以是一个时间范围,或者是单独的一个时间点):比如求一个博客在6月至8月的所有日志,或者是今天写博客的文章数。
还有一些微博客,只显示最近两天的内容,等等。
定义
一个时间线对象最少有两个属性
- 时间值
- 数据内容
针对时间线对象的操作通常都是一些范围型的操作:比如求某个时间点起到另一个时间点内的所有数据,或者是统计某个时间点起到另一个时间点内的数据数目,等等。
实现
在Redis中我们可以用sorted_set_struct表示时间线。
当增加一个新条目时,我们将条目内容作为有序集元素的member
参数,使用当前时间的UnixTime格式,作为有序集元素的score
值。
比如一条在2011年8月22日时47分7秒发出的信息,会被储存为:
ZADD tweet 1313981227.681918 "hello my friend"
其中tweet
是时间线的key
,1313981227.681918
是UnixTime格式的UTC时间,而"hello my friend"
则是条目内容。
这样一来,就可以用ZREVRANGE进行按条目数读取(比如读出最新10条数据),使用ZREVRANGEBYSCORE进行时间范围型的读取操作(比如读出2011年8月20日到2011年8月22日的所有数据),用ZREM对单条数据进行删除,用ZREMRANGEBYRANK和ZREMRANGEBYSCORE进行时间范围型的删除操作。
# coding: utf-8
# file: ./s/timeline/sorted_set_implement.py
from redis import Redis
from time import time
# Redis有序集边界
LEFTMOST = 0
RIGHTMOST = -1
class Timeline:
def __init__(self, name, client=Redis()):
self.name = name
self.client = client
def append(self, content):
# time()函数生成当前时间的unixtime值
self.client.zadd(self.name, score=time(), value=content)
def range(self, start=LEFTMOST, stop=RIGHTMOST, display_time=False):
# 使用zrevrange命令,读出最新的数据
return self.client.zrevrange(self.name, start, stop, withscores=display_time)
def range_between_time(self, min, max, display_time=False):
# min和max也必须是unixtime值
# 注意zrevrangebyscore命令参数的摆放是max先而min后
return self.client.zrevrangebyscore(self.name, max, min, withscores=display_time)
def delete(self, content):
return self.client.zrem(self.name, content)
def delete_between_time(self, min, max):
# min和max也必须是unixtime值
return self.client.zremrangebyscore(self.name, min=min, max=max)
def length(self):
return self.client.zcard(self.name)
t¶
TAG系统¶
tag在互联网应用里尤其多见,如果以传统的关系型数据库来设计有点不伦不类。我们以查找书的例子来看看redis在这方面的优势。
关系型数据库的设计¶
两张表,一张book
的明细,一张tag
表,表示每本书的tag
,一本书可以有多个tag
。
mysql> select * from book;
+------+-------------------------------+----------------+
| id | name | author |
+------+-------------------------------+----------------+
| 1 | The Ruby Programming Language | Mark Pilgrim |
| 1 | Ruby on rail | David Flanagan |
| 1 | Programming Erlang | Joe Armstrong |
+------+-------------------------------+----------------+
mysql> select * from tag;
+---------+---------+
| tagname | book_id |
+---------+---------+
| ruby | 1 |
| ruby | 2 |
| web | 2 |
| erlang | 3 |
+---------+---------+
假如有如此需求,查找即是ruby又是web方面的书籍,如果以关系型数据库会怎么处理?
select b.name, b.author from tag t1, tag t2, book b
where t1.tagname = 'web' and t2.tagname = 'ruby' and t1.book_id = t2.book_id and b.id = t1.book_id
tag
表自关联2次再与book
关联,这个sql还是比较复杂的,如果要求即ruby,但不是web方面的书籍呢?
关系型数据其实并不太适合这些集合操作。
REDIS的设计¶
首先book的数据肯定要存储的,和上面一样。
SET book:1:name ”The Ruby Programming Language”
SET book:2:name ”Ruby on rail”
SET book:3:name ”Programming Erlang”
SET book:1:author ”Mark Pilgrim”
SET book:2:author ”David Flanagan”
SET book:3:author ”Joe Armstrong”
tag
表我们使用集合来存储数据,因为集合擅长求交集、并集
SADD tag:ruby 1
SADD tag:ruby 2
SADD tag:web 2
SADD tag:erlang 3
那么,即属于ruby又属于web的书?
inter_list = redis.sinter("tag.web", "tag:ruby")
即属于ruby,但不属于web的书?
inter_list = redis.sdiff("tag.ruby", "tag:web")
属于ruby和属于web的书的合集?
inter_list = redis.sunion("tag.ruby", "tag:web")
简单到不行阿。
从以上2个例子可以看出在某些场景里,关系型数据库是不太适合的,你可能能够设计出满足需求的系统,但总是感觉的怪怪的,有种生搬硬套的感觉。
尤其登录系统这个例子,频繁的为业务建立索引。放在一个复杂的系统里,ddl(创建索引)有可能改变执行计划。导致其它的sql采用不同的执行计划,业务复杂的老系统,这个问题是很难预估的,sql千奇百怪。要求DBA对这个系统里所有的sql都了解,这点太难了。这个问题在oracle里尤其严重,每个DBA估计都碰到过。对于MySQL这类系统,ddl又不方便(虽然现在有online ddl的方法)。碰到大表,DBA凌晨爬起来在业务低峰期操作,这事我没少干过。而这种需求放到redis里就很好处理,DBA仅仅对容量进行预估即可。
未来的OLTP系统应该是kv和关系型的紧密结合。
y¶
用户登录系统¶
记录用户登录信息的一个系统, 我们简化业务后只留下一张表。
关系型数据库的设计¶
mysql> select * from login;
+---------+----------------+-------------+---------------------+
| user_id | name | login_times | last_login_time |
+---------+----------------+-------------+---------------------+
| 1 | ken thompson | 5 | 2011-01-01 00:00:00 |
| 2 | dennis ritchie | 1 | 2011-02-01 00:00:00 |
| 3 | Joe Armstrong | 2 | 2011-03-01 00:00:00 |
+---------+----------------+-------------+---------------------+
user_id
表的主键,name
表示用户名,login_times
表示该用户的登录次数,每次用户登录后,login_times
会自增,而last_login_time
更新为当前时间。
REDIS的设计¶
关系型数据转化为KV数据库,我的方法如下:
key 表名:主键值:列名
value 列值
一般使用冒号做分割符,这是不成文的规矩。比如在php-admin for redis系统里,就是默认以冒号分割,于是user:1
user:2
等key会分成一组。于是以上的关系数据转化成kv数据后记录如下:
SET login:1:login_times 5
SET login:2:login_times 1
SET login:3:login_times 2
SET login:1:last_login_time 2011-1-1
SET login:2:last_login_time 2011-2-1
SET login:3:last_login_time 2011-3-1
SET login:1:name ”ken thompson“
SET login:2:name “dennis ritchie”
SET login:3:name ”Joe Armstrong“
这样在已知主键的情况下,通过GET、set就可以获得或者修改用户的登录次数和最后登录时间和姓名。
一般用户是无法知道自己的id
的,只知道自己的用户名,所以还必须有一个从name
到id
的映射关系,这里的设计与上面的有所不同。
SET "login:ken thompson:id" 1
SET "login:dennis ritchie:id" 2
SET "login: Joe Armstrong:id" 3
这样每次用户登录的时候业务逻辑如下(python版),r是redis对象,name是已经获知的用户名。
#获得用户的id
uid = r.get("login:%s:id" % name)
#自增用户的登录次数
ret = r.incr("login:%s:login_times" % uid)
#更新该用户的最后登录时间
ret = r.set("login:%s:last_login_time" % uid, datetime.datetime.now())
如果需求仅仅是已知id
,更新或者获取某个用户的最后登录时间,登录次数,关系型和kv数据库无啥区别:一个通过btree pk,一个通过hash,效果都很好。
假设有如下需求,查找最近登录的N个用户。开发人员看看,还是比较简单的,一个sql搞定:
select * from login order by last_login_time desc limit N
DBA了解需求后,考虑到以后表如果比较大,所以在last_login_time
上建个索引。执行计划从索引leafblock 的最右边开始访问N条记录,再回表N次,效果很好。
过了两天,又来一个需求,需要知道登录次数最多的人是谁。同样的关系型如何处理?DEV说简单:
select * from login order by login_times desc limit N
DBA一看,又要在login_time
上建立一个索引。有没有觉得有点问题呢,表上每个字段上都有素引。
关系型数据库的数据存储的的不灵活是问题的源头,数据仅有一种储存方法,那就是按行排列的堆表。统一的数据结构意味着你必须使用索引来改变sql的访问路径来快速访问某个列的,而访问路径的增加又意味着你必须使用统计信息来辅助,于是一大堆的问题就出现了。
没有索引,没有统计计划,没有执行计划,这就是kv数据库。
redis里如何满足以上的需求呢?对于求最新的N条数据的需求,链表的后进后出的特点非常适合。我们在上面的登录代码之后添加一段代码,维护一个登录的链表,控制他的长度,使得里面永远保存的是最近的N个登录用户。
#把当前登录人添加到链表里
ret = r.lpush("login:last_login_times", uid)
#保持链表只有N位
ret = redis.ltrim("login:last_login_times", 0, N-1)
这样需要获得最新登录人的id
,如下的代码即可:
last_login_list = r.lrange("login:last_login_times", 0, N-1)
另外,求登录次数最多的人,对于排序,积分榜这类需求,sorted_set_struct非常的适合,我们把用户和登录次数统一存储在一个sorted_set_struct里。
ZADD login:login_times 5 1
ZADD login:login_times 1 2
ZADD login:login_times 2 3
这样假如某个用户登录,额外维护一个sorted_set_struct,代码如下:
#对该用户的登录次数自增1
ret = r.zincrby("login:login_times", 1, uid)
那么如何获得登录次数最多的用户呢?逆序排列取的排名第N的用户即可:
ret = r.zrevrange("login:login_times", 0, N-1)
可以看出,DEV需要添加2行代码,而DBA不需要考虑索引什么的。
z¶
自增唯一id(autoincrementing unique identifier)¶
应用
自增唯一id最常见的应用就是作为关系型数据库的主键,因为主键必须确保每个数据项都有唯一id。
它也可以在不支持自增唯一id的数据库中(比如MongoDB)用来替代唯一id(uniqueidentifier,通常是一个哈希值),为用户提供更好的URL:比如将/topic/4e491e229f328b0cd900010d
修改为/topic/10086
。
定义
一个自增唯一id对象最重要的是保证值的唯一性,要做到这一点,自增id的自增incr
操作必须是一个原子操作,它应该能在一个原子时间内完成以下两件事:
- 增加id值
- 返回当前id值
并且它也没有减法decr
和清零reset
等操作,因为这些操作破坏了唯一性。
get
操作一般只用于内部检查,比如观察值是否溢出,但在一般情况下,自增唯一id对象应该只有一个incr
操作。
See also
如果你需要一个非唯一的,带incr
、reset
和decr
等操作的计数对象,请参考计数器(counter)。
实现
自增唯一id可以用以下两种方式实现:
- String类函数: INCR和GET。
- Hash类函数:HINCRBY和HGET。
String实现¶
# file: ./z/auid/string_implement.py
from redis import Redis
INITIAL_VALUE = 0
class Auid:
def __init__(self, name, client=Redis()):
self.name = name
self.client = client
def incr(self):
value = self.client.incr(self.name)
return int(value)
def get(self):
value = self.client.get(self.name)
return INITIAL_VALUE if value is None else int(value)
Hash实现¶
# file: ./z/auid/hash_implement.py
from redis import Redis
INITIA_VALUE = 0
INCREMENT = 1
KEY = 'auid'
class Auid:
def __init__(self, name, client=Redis(), key=KEY):
self.key = key
self.name = name
self.client = client
def incr(self):
value = self.client.hincrby(self.key, self.name, INCREMENT)
return int(value)
def get(self):
value = self.client.hget(self.key, self.name)
return INITIA_VALUE if value is None else int(value)
关于Redis cookbook¶
本项目的目标是展示Redis数据库的各种用法。
贡献本项目¶
如果你有任何关于Redis的新“菜式”,或者对现有“菜式”有更好的实现方法和见解,你可以通过发表issue或者直接提交代码来贡献本项目。
本项目正文是使用sphinx格式写成的,关于sphinx格式,具体可以参考sphinx的官方网站。不熟悉sphinx格式的朋友,可以直接联系本人,通过邮件或其他方式提交你的菜谱(转换格式之类的苦差就交给我吧)。
本项目菜谱实现主要使用Python语言,客户端使用redis-py,但也欢迎其他任何编程语言。
为了保持程序的正确性和严谨性,请在提交代码之前测试,并在提交代码时连测试一并提交。
贡献者名单¶
hotteran ruan撰写了“TAG系统”和“用户登录系统”两个菜式。