队列

go-zero中使用的队列组件为go-queue,是gozero官方实现的基于KafkaBeanstalkd 的消息队列框架

1. kafka

Overview of Kafka architecture

A Kafka topic with three partitions and a replication factor of 3

Consumer group reading from a partitioned topic

涉及到几个概念:

  • Producer: 消息生产者,将消息发送到broker中
  • Consumer:消息消费者,从broker获取消息
  • Broker:kafka服务器,一个集群由多个broker组成
  • Topic:消息队列的名称
  • Partition:分区,存放消息的单位,每一个分区中的消息队列是有序的,一个topic消息会分布在不同的broker上以及不同的Partition中
  • Offset:偏移量,消息在队列中的位置,是Partition中每条消息的标识
  • Consumer Group:消费者组,组内的消费者共同消费一个topic中所有分区的数据,可以提供消费能力

2. 配置说明

type KqConf struct {
   service.ServiceConf
   Brokers    []string
   Group      string
   Topic      string
   Offset     string `json:",options=first|last,default=last"`
   Conns      int    `json:",default=1"`
   Consumers  int    `json:",default=8"`
   Processors int    `json:",default=8"`
   MinBytes   int    `json:",default=10240"`    // 10K
   MaxBytes   int    `json:",default=10485760"` // 10M
   Username   string `json:",optional"`
   Password   string `json:",optional"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  • Brokers: kafka 的多个 Broker 节点
  • Group:消费者组
  • Topic:订阅的 Topic 主题
  • Offset:如果新的 topic kafka 没有对应的 offset 信息,或者当前的 offset 无效了(历史数据被删除),那么需要指定从头(first)消费还是从尾(last)部消费
  • Conns: 一个 kafka queue 对应可对应多个 consumer,Conns 对应 kafka queue 数量,可以同时初始化多个 kafka queue,默认只启动一个
  • Consumers : go-queue 内部是起多个 goroutine 从 kafka 中获取信息写入进程内的 channel,这个参数是控制此处的 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
  • Processors: 当 Consumers 中的多个 goroutine 将 kafka 消息拉取到进程内部的 channel 后,我们要真正消费消息写入我们自己逻辑,go-queue 内部通过此参数控制当前消费的并发 goroutine 数量
  • MinBytes: fetch 一次返回的最小字节数,如果不够这个字节数就等待.
  • MaxBytes: fetch 一次返回的最大字节数,如果第一条消息的大小超过了这个限制仍然会继续拉取保证 consumer 的正常运行.因此并不是一个绝对的配置,消息的大小还需要受到 broker 的message.max.bytes限制,以及 topic 的max.message.bytes的限制
  • Username: kafka 的账号
  • Password:kafka 的密码

3. 入门案例

  kafka:
    image: 'bitnami/kafka:3.6.2'
    container_name: kafka
    restart: always
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
    environment:
      - TZ=Asia/Shanghai
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - '9092:9092'
      - '9094:9094'
    volumes:
      - ./volumes/kafka:/bitnami/kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

3.1 配置

KqPusherConf:
  Name: log-producer
  Brokers:
    - 127.0.0.1:9094
  Topic: login-log
  Group: login-log-group
KqConsumerConf:
  Name: log-consumer
  Brokers:
    - 127.0.0.1:9094
  Group: login-login-group
  Topic: login-log
1
2
3
4
5
6
7
8
9
10
11
12
type Config struct {
	rest.RestConf
	MysqlConfig    MysqlConfig
	Auth           Auth
	RedisConfig    redis.RedisConf
	KqPusherConf   kq.KqConf
	KqConsumerConf kq.KqConf
}
1
2
3
4
5
6
7
8

3.2 注入svc

type ServiceContext struct {
	Config         config.Config
	Conn           sqlx.SqlConn
	RedisClient    *redis.Redis
	KqPusherClient *kq.Pusher
}

func NewServiceContext(c config.Config) *ServiceContext {
	sqlConn := db.NewMysql(c.MysqlConfig)
	redisClient := db.NewRedis(c.RedisConfig)
	KqPusherClient := kq.NewPusher(
		c.KqPusherConf.Brokers,
		c.KqPusherConf.Topic,
	)
	return &ServiceContext{
		Config:         c,
		Conn:           sqlConn,
		RedisClient:    redisClient,
		KqPusherClient: KqPusherClient,
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

3.3 日志生产者

go func() {
		//记录登录日志
		logData := map[string]any{
			"user_id": u.Id,
			"ip":      "127.0.0.1",
			"device":  "pc",
			"status":  "success",
			"msg":     "登录成功",
			"time":    time.Now().Format("2006-01-02 15:04:05"),
		}
		marshal, _ := json.Marshal(logData)
		//key能保证 消息进入当前用户的消息 进入一个分区 保证顺序性
		//如果不填key 默认按照轮训方式 安排分区
		err = l.svcCtx.KqPusherClient.PushWithKey(context.Background(), fmt.Sprintf("login_log_%d", u.Id), string(marshal))
		if err != nil {
			l.Logger.Error(err)
		}
	}()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

3.4 日志消费者

package queue

import (
	"context"
	"github.com/zeromicro/go-zero/core/logx"
	"user-api/internal/svc"
)

type LoginLogConsumer struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewLoginLogConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LoginLogConsumer {
	return &LoginLogConsumer{
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (*LoginLogConsumer) Consume(ctx context.Context, key, val string) error {
	logx.Infof("login log key: %s, val: %s \n", key, val)
	return nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package queue

import (
	"context"
	"github.com/zeromicro/go-queue/kq"
	"github.com/zeromicro/go-zero/core/service"
	"user-api/internal/config"
	"user-api/internal/svc"
)

func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
	return []service.Service{
		kq.MustNewQueue(c.KqConsumerConf, NewLoginLogConsumer(ctx, svcCtx)),
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//启动消费者
	group := service.NewServiceGroup()
	defer group.Stop()
	for _, mq := range queue.Consumers(c, context.Background(), ctx) {
		group.Add(mq)
	}
	group.Add(server)
	group.Start()
1
2
3
4
5
6
7
8

4. 高级使用

4.1 参数配置

kq.NewPusher 第三个参数是 options,就是支持传递的可选参数

  • chunkSize : 由于效率问题,kq client 是批量提交,批量消息体达到此大小才会提交给 kafka。
  • flushInterval:间隔多久提交一次。即使未达到 chunkSize 但是达到了这个间隔时间也会向 kafka 提交
  • allowAutoTopicCreation:是否自动创建topic
	KqPusherClient := kq.NewPusher(
		c.KqPusherConf.Brokers,
		c.KqPusherConf.Topic,
		kq.WithChunkSize(1024),
		kq.WithFlushInterval(time.Second),
		kq.WithAllowAutoTopicCreation(),
	)
1
2
3
4
5
6
7

当然,consumer 中在 mqs.go 中 kq.MustNewQueue 初始化时候这个参数也是可选参数

  • commitInterval : 提交给 kafka broker 间隔时间,默认是 1s
  • queueCapacity:kafka 内部队列长度
  • maxWait:从 kafka 批量获取数据时,等待新数据到来的最大时间。
  • metrics:上报消费每个消息消费时间,默认会内部初始化,一般也不需要指定
func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
	return []service.Service{
		kq.MustNewQueue(
			c.KqConsumerConf,
			NewLoginLogConsumer(ctx, svcCtx),
			kq.WithCommitInterval(time.Second),
		),
	}
}
1
2
3
4
5
6
7
8
9

4.2 负载策略

KqPusherClient := kq.NewPusher(
		c.KqPusherConf.Brokers,
		c.KqPusherConf.Topic,
		kq.WithChunkSize(1024),
		kq.WithFlushInterval(time.Second),
		kq.WithAllowAutoTopicCreation(),
    	//配置负载策略,有hash、roundRobin、leastBytes,也可以自定义实现
		kq.WithBalancer(&kafka.Hash{}),
	)
1
2
3
4
5
6
7
8
9

4.3 延迟队列

如果使用kafka来实现:

  • 设置消息发送时,延迟时间
  • 收到消息后,判断当前时间和延迟时间
  • 如果不到延迟时间,将消息重新发回队列进行处理