其他操作
1. 发布订阅
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。
1. Subscribe
订阅channel
// 订阅channel1这个channel
sub := rdb.Subscribe(ctx, "channel1")
// sub.Channel() 返回go channel,可以循环读取redis服务器发过来的消息
for msg := range sub.Channel() {
// 打印收到的消息
fmt.Println(msg.Channel)
fmt.Println(msg.Payload)
}
//或者
for {
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
fmt.Println(msg.Channel, msg.Payload)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2. Publish
将消息发送到指定的channel
// 将"message"消息发送到channel1这个通道上
rdb.Publish(ctx,"channel1","message")
1
2
2
3. PSubscribe
用法跟Subscribe一样,区别是PSubscribe订阅通道(channel)支持模式匹配。
// 订阅channel1这个channel
sub := rdb.PSubscribe(ctx,"ch_user_*")
// 可以匹配ch_user_开头的任意channel
1
2
3
2
3
4. Unsubscribe
取消订阅
// 订阅channel1这个channel
sub := rdb.Subscribe(ctx,"channel1")
// 取消订阅
sub.Unsubscribe(ctx,"channel1")
1
2
3
4
2
3
4
5. PubSubNumSub
查询指定的channel有多少个订阅者
// 查询channel_1通道的订阅者数量
chs, _ := rdb.PubSubNumSub(ctx, "channel_1").Result()
for ch, count := range chs {
fmt.Println(ch) // channel名字
fmt.Println(count) // channel的订阅者数量
}
1
2
3
4
5
6
2
3
4
5
6
2. 事务处理
redis事务可以一次执行多个命令, 并且带有以下两个重要的保证:
- 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。
1. TxPipeline
以Pipeline的方式操作事务
// 开启一个TxPipeline事务
pipe := rdb.TxPipeline()
// 执行事务操作,可以通过pipe读写redis
incr := pipe.Incr(ctx,"tx_pipeline_counter")
pipe.Expire(ctx,"tx_pipeline_counter", time.Hour)
// 上面代码等同于执行下面redis命令
//
// MULTI
// INCR pipeline_counter
// EXPIRE pipeline_counts 3600
// EXEC
// 通过Exec函数提交redis事务
_, err := pipe.Exec(ctx)
// 提交事务后,我们可以查询事务操作的结果
// 前面执行Incr函数,在没有执行exec函数之前,实际上还没开始运行。
fmt.Println(incr.Val(), err)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2. watch
redis乐观锁支持,可以通过watch监听一些Key, 如果这些key的值没有被其他人改变的话,才可以提交事务
ctx := context.Background()
// 定义一个回调函数,用于处理事务逻辑
fn := func(tx *redis.Tx) error {
// 先查询下当前watch监听的key的值
v, err := tx.Get(ctx, "key").Int()
if err != nil && err != redis.Nil {
return err
}
// 这里可以处理业务
v++
// 如果key的值没有改变的话,Pipelined函数才会调用成功
_, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error {
// 在这里给key设置最新值
pipe.Set(ctx, "key", v, 0)
return nil
})
return err
}
// 使用Watch监听一些Key, 同时绑定一个回调函数fn, 监听Key后的逻辑写在fn这个回调函数里面
// 如果想监听多个key,可以这么写:client.Watch(ctx,fn, "key1", "key2", "key3")
rdb.Watch(ctx, fn, "key")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24