Redis Time Series

http://www.infoq.com/articles/redis-time-series

Redis自从创建之初就被用于存储和分析时间序列数据. 它从最开始作为一个缓冲区和日志的存储, 发展到现在包含了丰富的数据结构和类型, 并提供了不同的方法来分析Redis中的数据. 这篇文章会介绍使用Redis的最灵活的方法来分析时间序列数据.

竞态条件和事务

在Redis中每个单独的命令都是原子性的,但是多个命令顺序执行则不具有原子性,而且可能存在竞态条件会导致错误的结果. 为了解决这种问题, 这里会使用”事务管道”和”Lua脚本”来防止竞态条件.

如果使用Python客户端访问Redis,在Redis的Connection实例上调用无参数的pipeline()会构造一个”事务管道”(其他客户端中会叫做事务或者MULTI/EXEC事务).
在底层管道会收集传进来的所有命令直到execute方法被调用. 当execute方法调用,客户端会向Redis发送MULTI命令,然后紧跟着所有收集到的命令,最后是EXEC命令.
当这一组命令被Redis执行,Redis会保证原子性地执行这一组命令,而不会被其他命令打断.

使用SortedSet和Hashes进行高级分析

在Redis中存储和分析时间数据最灵活的方式使用Sorted Set和Hash这两种数据结构.

Sorted Set是一种结合了hashTable和有序的Tree的结构. 简而言之,Sorted Set的每个条目是字符串的”member”和浮点类型的”score”.
member作为hash的键,score作为Tree的有序值. 通过这种组合,你可以根据member或score的值直接获取members和scores, 而且还有其他方法可以根据score的值获取有序的members和scores.

存储事件

存储时间序列数据的通常做法是使用一个或多个的Sorted Sets和一些Hashes.
它是很多应用最底层的构建单元,广泛应用在社交网络比如Twitter,新站点Reddit或Hacker News.
它们都基于Redis之上构建了一种完全面向对象映射的应用程序.

举个例子,我们接收一个网站上代表用户活动的事件流,假设所有的事件都有四个共同的属性:id,timestamp,type,user.
并且还有一些其他可变的属性,根据type而不同. 为了存储每一条事件,我们会使用Redis的Hash来存储,它的key来自于事件的id.
为了生成事件id,可以使用不同的数据源(这个数据源负责生成唯一的事件id),这里我们暂时使用Redis的counter计数器生成id.
在64位的平台上使用64位的Redis最多可以生成2^63-1条事件,并且主要限制于机器的可用内存.

当我们准备好了数据,会首先以hash存储,然后插入一个member/score对到Sorted Set中,从event-id(member)映射到event-timestamp(score)

1
2
3
4
5
6
7
8
9
def record_event(conn, event):
id = conn.incr('event:id') # use counter to increment event-id
event['id'] = id
event_key = 'event:{id}'.format(id=id)

pipe = conn.pipeline(True)
pipe.hmset(event_key, event) # hash, key is event-key, value is the whole event object
pipe.zadd('events', **{id: event['timestamp']}) #Sorted Set. member is id, score is timestamp
pipe.execute()

在record_event函数中,我们接收了一条事件,从Redis中计算出新的id,作为这条事件的id, 并生成event的key.
然后开始创建一个管道,存储这条事件到hash-map,添加event-id到event-timestamp到sorted set.
当事务管道执行完成后, 这条事件就被记录并存储在Redis中了.

事件分析

现在我们可以有多种方式来分析时间序列数据. 可以使用ZRANG扫描出最旧和最新的事件id,之后可能把事件记录拉取出来用于分析.
可以结合使用ZRANGEBYSCORELIMIT参数获取某个时间点之前或之后的10条或100条事件.
可以使用ZCOUNT计算一段时间内的事件数量. 或者使用LUA脚本实现,比如下面的例子,计算给定一段时间范围的不同类型的事件数量.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import json

def count_types(conn, start, end):
counts = count_types_lua(keys=['events'], args=[start, end])
return json.loads(counts)

count_types_lua = conn.register_script('''
local counts = {}
local ids = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2])
for i, id in ipairs(ids) do
local type = redis.call('HGET', 'event:' .. id, 'type')
counts[type] = (counts[type] or 0) + 1
end

return cjson.encode(counts)
''')

count_types函数准备了封装好的lua脚本,lua脚本首先初始化了一个results map(也叫做table),然后使用ZRANGEBYSCORE读取指定时间范围内的事件id列表. 获取到所有id后,读取出这条事件对应的类型,并增加计数器的值,最后返回json的格式串.

性能提升和数据模型


文章目录
  1. 1. 竞态条件和事务
  2. 2. 使用SortedSet和Hashes进行高级分析
    1. 2.1. 存储事件
    2. 2.2. 事件分析
    3. 2.3. 性能提升和数据模型