队列
go-zero中使用的队列组件为
go-queue
,是gozero官方实现的基于Kafka
和Beanstalkd
的消息队列框架
1. kafka
涉及到几个概念:
- Producer: 消息生产者,将消息发送到broker中
- Consumer:消息消费者,从broker获取消息
- Broker:kafka服务器,一个集群由多个broker组成
- Topic:消息队列的名称
- Partition:分区,存放消息的单位,每一个分区中的消息队列是有序的,一个topic消息会分布在不同的broker上以及不同的Partition中
- Offset:偏移量,消息在队列中的位置,是Partition中每条消息的标识
- Consumer Group:消费者组,组内的消费者共同消费一个topic中所有分区的数据,可以提供消费能力
2. 配置说明
type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
- Brokers: kafka 的多个 Broker 节点
- Group:消费者组
- Topic:订阅的 Topic 主题
- Offset:如果新的 topic kafka 没有对应的 offset 信息,或者当前的 offset 无效了(历史数据被删除),那么需要指定从头(
first
)消费还是从尾(last
)部消费 - Conns: 一个 kafka queue 对应可对应多个 consumer,Conns 对应 kafka queue 数量,可以同时初始化多个 kafka queue,默认只启动一个
- Consumers : go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
- Processors: 当 Consumers 中的多个 goroutine 将 kafka 消息拉取到进程内部的 channel 后,我们要真正消费消息写入我们自己逻辑,go-queue 内部通过此参数控制当前消费的并发 goroutine 数量
- MinBytes: fetch 一次返回的最小字节数,如果不够这个字节数就等待.
- MaxBytes: fetch 一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证 consumer 的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到 broker 的
message.max.bytes
限制,以及 topic 的max.message.bytes
的限制 - Username: kafka 的账号
- Password:kafka 的密码
3. 入门案例
kafka:
image: 'bitnami/kafka:3.6.2'
container_name: kafka
restart: always
ulimits:
nofile:
soft: 65536
hard: 65536
environment:
- TZ=Asia/Shanghai
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9094:9094'
volumes:
- ./volumes/kafka:/bitnami/kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
3.1 配置
KqPusherConf:
Name: log-producer
Brokers:
- 127.0.0.1:9094
Topic: login-log
Group: login-log-group
KqConsumerConf:
Name: log-consumer
Brokers:
- 127.0.0.1:9094
Group: login-login-group
Topic: login-log
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
type Config struct {
rest.RestConf
MysqlConfig MysqlConfig
Auth Auth
RedisConfig redis.RedisConf
KqPusherConf kq.KqConf
KqConsumerConf kq.KqConf
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
3.2 注入svc
type ServiceContext struct {
Config config.Config
Conn sqlx.SqlConn
RedisClient *redis.Redis
KqPusherClient *kq.Pusher
}
func NewServiceContext(c config.Config) *ServiceContext {
sqlConn := db.NewMysql(c.MysqlConfig)
redisClient := db.NewRedis(c.RedisConfig)
KqPusherClient := kq.NewPusher(
c.KqPusherConf.Brokers,
c.KqPusherConf.Topic,
)
return &ServiceContext{
Config: c,
Conn: sqlConn,
RedisClient: redisClient,
KqPusherClient: KqPusherClient,
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
3.3 日志生产者
go func() {
//记录登录日志
logData := map[string]any{
"user_id": u.Id,
"ip": "127.0.0.1",
"device": "pc",
"status": "success",
"msg": "登录成功",
"time": time.Now().Format("2006-01-02 15:04:05"),
}
marshal, _ := json.Marshal(logData)
//key能保证 消息进入当前用户的消息 进入一个分区 保证顺序性
//如果不填key 默认按照轮训方式 安排分区
err = l.svcCtx.KqPusherClient.PushWithKey(context.Background(), fmt.Sprintf("login_log_%d", u.Id), string(marshal))
if err != nil {
l.Logger.Error(err)
}
}()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3.4 日志消费者
package queue
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"user-api/internal/svc"
)
type LoginLogConsumer struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewLoginLogConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LoginLogConsumer {
return &LoginLogConsumer{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (*LoginLogConsumer) Consume(ctx context.Context, key, val string) error {
logx.Infof("login log key: %s, val: %s \n", key, val)
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package queue
import (
"context"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/service"
"user-api/internal/config"
"user-api/internal/svc"
)
func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
return []service.Service{
kq.MustNewQueue(c.KqConsumerConf, NewLoginLogConsumer(ctx, svcCtx)),
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//启动消费者
group := service.NewServiceGroup()
defer group.Stop()
for _, mq := range queue.Consumers(c, context.Background(), ctx) {
group.Add(mq)
}
group.Add(server)
group.Start()
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
4. 高级使用
4.1 参数配置
kq.NewPusher 第三个参数是 options,就是支持传递的可选参数
- chunkSize : 由于效率问题,kq client 是批量提交,批量消息体达到此大小才会提交给 kafka。
- flushInterval:间隔多久提交一次。即使未达到 chunkSize 但是达到了这个间隔时间也会向 kafka 提交
- allowAutoTopicCreation:是否自动创建topic
KqPusherClient := kq.NewPusher(
c.KqPusherConf.Brokers,
c.KqPusherConf.Topic,
kq.WithChunkSize(1024),
kq.WithFlushInterval(time.Second),
kq.WithAllowAutoTopicCreation(),
)
1
2
3
4
5
6
7
2
3
4
5
6
7
当然,consumer 中在 mqs.go 中 kq.MustNewQueue 初始化时候这个参数也是可选参数
- commitInterval : 提交给 kafka broker 间隔时间,默认是 1s
- queueCapacity:kafka 内部队列长度
- maxWait:从 kafka 批量获取数据时,等待新数据到来的最大时间。
- metrics:上报消费每个消息消费时间,默认会内部初始化,一般也不需要指定
func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
return []service.Service{
kq.MustNewQueue(
c.KqConsumerConf,
NewLoginLogConsumer(ctx, svcCtx),
kq.WithCommitInterval(time.Second),
),
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.2 负载策略
KqPusherClient := kq.NewPusher(
c.KqPusherConf.Brokers,
c.KqPusherConf.Topic,
kq.WithChunkSize(1024),
kq.WithFlushInterval(time.Second),
kq.WithAllowAutoTopicCreation(),
//配置负载策略,有hash、roundRobin、leastBytes,也可以自定义实现
kq.WithBalancer(&kafka.Hash{}),
)
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.3 延迟队列
如果使用kafka来实现:
- 设置消息发送时,延迟时间
- 收到消息后,判断当前时间和延迟时间
- 如果不到延迟时间,将消息重新发回队列进行处理