Redis SortedSet

使用Redis的SortedSet+Lua脚本+Spark

Redis快速入门

1
2
3
4
5
6
7
8
9
10
cd redis-3.2.9
make
src/redis-server
src/redis-cli

➜ redis-3.2.9 src/redis-cli
127.0.0.1:6379> set hello "world"
OK
127.0.0.1:6379> get hello
"world"

Spark-Redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val conf: SparkConf = new SparkConf().setAppName("VelocityIdNumberX_Redis").setMaster("local")
conf.set("redis.host", "localhost").set("redis.port", "6379")
val sc = new SparkContext(conf)

val wc = sc.parallelize(List(
("hello", "1"),
("world", "2"),
("redis", "2")
))
sc.toRedisKV(wc)
sc.toRedisZSET(wc, "zkey1")
sc.toRedisZSET(wc, "zkey2")
sc.toRedisHASH(wc, "zmap1")
sc.toRedisHASH(wc, "zmap2")
sc.toRedisLIST(wc.map(_._1), "zlist1")
sc.toRedisLIST(wc.map(_._1), "zlist2")

查看redis的记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> get hello
"1"
127.0.0.1:6379> get world
"2"
127.0.0.1:6379> zrange zkey1 0 -1 WITHSCORES
1) "hello"
2) "1"
3) "redis"
4) "2"
5) "world"
6) "2"

127.0.0.1:6379> HMGET zmap1 hello redis
1) "1"
2) "2"

读取的代码

1
2
val resultSet:RDD[(String, String)] = sc.fromRedisKV("*o*")
resultSet.toDF().show

结果:

1
2
3
4
5
6
+-----+-----+
| _1| _2|
+-----+-----+
|hello|world|
|world| 2|
+-----+-----+

pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//要写入redis的数据,RDD[Map[String,String]]
val rdd = sc.parallelize(List(
("map1", "key1", "111"),
("map1", "key2", "112"),
("map1", "key3", "113"),
("map2", "key1", "114")
))
//使用pipeline 更高效的批处理
// HSET map1 key1 value1
// HSET map1 key2 value2
rdd.foreachPartition{iter =>
val redis = new Jedis("localhost",6379,400000)
val ppl = redis.pipelined()
iter.foreach{row =>
val mapKey = row._1
val key = row._2
val value = row._3
ppl.hmset(mapKey, Map(key -> value))

//zadd key score member
ppl.zadd(mapKey.replace("map","set"), value.toDouble, key)
}
ppl.sync()
}

结果:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> zrange set1 0 -1
1) "key1"
2) "key2"
3) "key3"
127.0.0.1:6379> zrange set2 0 -1
1) "key1"
127.0.0.1:6379> HMGET map1 key1 key2 key3
1) "111"
2) "112"
3) "113"

业务需求

原始数据: (维度1,维度2,维度3,partner_code,timestamp), 转换成:

1
2
3
4
5
RDD(维度,partner_code,timestamp) = {
(维度1,partner_code,timestamp),
(维度2,partner_code,timestamp),
(维度3,partner_code,timestamp)
}

td_cross_partner

Redis & Lua

lua脚本

1
2
3
local link_id = redis.call("INCR", KEY[1])
redis.call("HSET", KEYS[2], link_id, ARGV[1])
return link_id

redis客户端命令

1
2
redis-cli EVAL "$(cat incr-and-stor.lua)" 2 links:counter links:urls http://spark.apache.org/
redis-cli EVAL "$(cat incr-and-stor.lua)" 2 links:counter links:urls http://kafka.apache.org/

对应的lua脚本

1
2
3
local link_id = redis.call("INCR", "links:counter")
redis.call("HSET", "links:urls", link_id, "http://spark.apche.org")
return link_id

第一条记录

1
2
link_id = INCR links:counter
HSET links:urls link_id http://spark.apache.org/

第二条记录

1
2
link_id = INCR links:counter
HSET links:urls link_id http://kafka.apache.org/

links:usrs作为map的名称, link_id作为map的key, 最后的参数url作为key对应的value. java的Map

1
2
3
Map links:urls = new HashMap()
links:urls.put(1, http://spark.apache.org/)
links:urls.put(2, http://kafka.apache.org/)

实现LPOPRPUSH的lua脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
local redis = require 'redis'
local client = redis.connect('127.0.0.1', 6379)

function lpoprpush(source, destination)
script = [[
local item = redis.call('lpop', KEYS[1])
if item ~= nil then
redis.call('rpush', KEYS[2], item)
return item
end
]]

return client:eval(script, 2, source, destination)
end

插入数据流程

脚本的key为SortedSet的key, 参数分别是key的score和member.

1
2
3
4
5
6
7
8
local score = redis.call('zscore', KEYS[1], ARGV[2])
if score then
if(tonumber(score) < tonumber(ARGV[1])) then
redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
end
else
redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
end

验证数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
src/redis-cli EVAL "$(cat insert-attr-partner.lua)" 1 15959086950 1453708983 koudai 

127.0.0.1:6379> ZRANGE 15959086950 0 -1 WITHSCORES
1) "koudai"
2) "1453708983"

src/redis-cli EVAL "$(cat insert-attr-partner.lua)" 1 15959086950 1453708983 jiedaibao

127.0.0.1:6379> ZRANGE 15959086950 0 -1 WITHSCORES
1) "jiedaibao"
2) "1453708980"
3) "koudai"
4) "1453708983"

传入旧的时间,不更新
src/redis-cli EVAL "$(cat insert-attr-partner.lua)" 1 15959086950 1453708900 koudai

127.0.0.1:6379> ZRANGE 15959086950 0 -1 WITHSCORES
1) "jiedaibao"
2) "1453708980"
3) "koudai"
4) "1453708983"

传入新的时间,更新
src/redis-cli EVAL "$(cat insert-attr-partner.lua)" 1 15959086950 1453709000 koudai

127.0.0.1:6379> ZRANGE 15959086950 0 -1 WITHSCORES
1) "jiedaibao"
2) "1453708980"
3) "koudai"
4) "1453709000"

新的key:15959086951
src/redis-cli EVAL "$(cat insert-attr-partner.lua)" 1 15959086951 1453708980 koudai

127.0.0.1:6379> ZRANGE 15959086950 0 -1 WITHSCORES
1) "jiedaibao"
2) "1453708980"
3) "koudai"
4) "1453709000"

127.0.0.1:6379> ZRANGE 15959086951 0 -1 WITHSCORES
1) "koudai"
2) "1453708980"

带有返回值的lua脚本

1
2
3
4
5
6
7
8
9
10
local score = redis.call('zscore', KEYS[1], ARGV[2])
if score then
if(tonumber(score) < tonumber(ARGV[1])) then
redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
else return -1
end
else
redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
end
return 1

加载脚本,并使用EVALSHA执行

1
2
3
4
src/redis-cli SCRIPT LOAD "$(cat attr-partner.lua)"
9afb46620b42e84770d64b4ca4c102b626bb20fc

src/redis-cli EVALSHA 9afb46620b42e84770d64b4ca4c102b626bb20fc 1 15959086951 1453708980 koudai

加载之后, 可以直接使用sha的值调用, 而不需要使用EVAL+脚本的方式.

1
2
3
4
➜  redis-3.0.2  src/redis-cli EVALSHA 9afb46620b42e84770d64b4ca4c102b626bb20fc 1 15959086951 1453708980 koudai
(integer) -1
➜ redis-3.0.2 src/redis-cli EVALSHA 9afb46620b42e84770d64b4ca4c102b626bb20fc 1 15959086951 1453709111 koudai
(integer) 1

查询数据流程

get-attr-partner.lua

1
2
3
4
local now = os.time()
local start = now - tonumber(ARGV[1]) * 3600000 * 30 * 365
redis.call('zrangebyscore', KEYS[1], start, now)
end

RedisClient

客户端提供从JedisPool池中返回Jedis实例.

1
2
3
4
5
6
7
8
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

object RedisClient extends Serializable {
val redisPort = 6379
val redisTimeout = 30000
lazy val localPool = new JedisPool(new GenericObjectPoolConfig(), "localhost", redisPort, redisTimeout)
}

Spark-Redis

对每个Partition都需要使用新的Redis实例操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
case class AttributePartner(attribute: String, timestamp: Long, partner: String)

val testRdd = sc.parallelize(List(
AttributePartner("mob_15959086950",1453708980L,"koudai"),
AttributePartner("mob_15959086950",1453709000L,"koudai"),
AttributePartner("mob_15959086950",1453709000L,"jiedaibao"),
AttributePartner("mob_15959086950",1453700800L,"jiedaibao")
), 2)

val script =
"""
|local score = redis.call('zscore', KEYS[1], ARGV[2])
|if score then
| if(tonumber(score) < tonumber(ARGV[1])) then
| redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
| else return -1
| end
|else
| redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
|end
|return 1
""".stripMargin

def testLocalJedis(): Unit ={
testRdd.foreachPartition(partitionOfRecords=>{
partitionOfRecords.foreach(record=>{
val jedis = RedisClient.localPool.getResource
jedis.eval(script, 1, record.attribute, ""+record.timestamp, record.partner)
RedisClient.localPool.returnResource(jedis)
})
})
}

数据过滤

项目已经上线,但是要过滤某个测试数据member.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
➜  redis-3.0.2  src/redis-cli
127.0.0.1:6379> keys *
1) "15959086951"
2) "15959086950"
3) "mob_15959086950"
4) "mob_15959086951"
5) "mykey"
127.0.0.1:6379> keys mob*
1) "mob_15959086950"
2) "mob_15959086951"
127.0.0.1:6379> zrange mob_15959086950 0 -1
1) "jiedaibao"
2) "koudai"
127.0.0.1:6379> zadd mob_15959086950 1111111 test
(integer) 1
127.0.0.1:6379> zrange mob_15959086950 0 -1
1) "test"
2) "jiedaibao"
3) "koudai"

不能直接删除key, 而是要遍历所有key中包含某个member的数据.下面是删除指定key中的某个member.

1
2
3
4
5
127.0.0.1:6379> ZREM mob_15959086950 test
(integer) 1
127.0.0.1:6379> zrange mob_15959086950 0 -1
1) "jiedaibao"
2) "koudai"

遍历所有的key,如果所有的key类型不一致,则使用同一个api删除时会报错:

WRONGTYPE Operation against a key holding the wrong kind of value

所以更健壮的做法是首先检测key的类型.

1
2
3
4
5
6
127.0.0.1:6379> type mob_15959086951
zset
127.0.0.1:6379> type 15959086950
zset
127.0.0.1:6379> type mykey
string

遍历所有key,如果这个key没有member,也会被查询到,更好的方法是只有member的才删除,不过目前没有这种api.


文章目录
  1. 1. Redis快速入门
  2. 2. 业务需求
  3. 3. Redis & Lua
  4. 4. 插入数据流程
  5. 5. 查询数据流程
  6. 6. RedisClient
  7. 7. Spark-Redis
  8. 8. 数据过滤