跳转至

Kafka 确认机制

总览

Kafka 的消息确认机制可以分为生产者端和消费者端两个方面:

机制 生产者端 消费者端
确认目标 Broker 确认消息持久化 消费者确认消息已处理
实现方式 acks=0/1/all 偏移量提交(自动/手动)
可靠性影响 保证消息不丢失 保证消息不重复/不丢失

生产者端

消息构造

sarama 客户端提供了消息格式:sarama.ProducerMessage,其可导出字段如下:

type ProducerMessage struct {
    Topic string // The Kafka topic for this message.
    // The partitioning key for this message. Pre-existing Encoders include
    // StringEncoder and ByteEncoder.
    Key Encoder
    // The actual message to store in Kafka. Pre-existing Encoders include
    // StringEncoder and ByteEncoder.
    Value Encoder

    // The headers are key-value pairs that are transparently passed
    // by Kafka between producers and consumers.
    Headers []RecordHeader

    // This field is used to hold arbitrary data you wish to include so it
    // will be available when receiving on the Successes and Errors channels.
    // Sarama completely ignores this field and is only to be used for
    // pass-through data.
    Metadata interface{}

    // Below this point are filled in by the producer as the message is processed

    // Offset is the offset of the message stored on the broker. This is only
    // guaranteed to be defined if the message was successfully delivered and
    // RequiredAcks is not NoResponse.
    Offset int64
    // Partition is the partition that the message was sent to. This is only
    // guaranteed to be defined if the message was successfully delivered.
    Partition int32
    // Timestamp can vary in behaviour depending on broker configuration, being
    // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
    // and requiring version at least 0.10.0.
    //
    // When configured to CreateTime, the timestamp is specified by the producer
    // either by explicitly setting this field, or when the message is added
    // to a produce set.
    //
    // When configured to LogAppendTime, the timestamp assigned to the message
    // by the broker. This is only guaranteed to be defined if the message was
    // successfully delivered and RequiredAcks is not NoResponse.
    Timestamp time.Time
}

其中,Topic 和 Value 是必须规定的

当使用 config.Producer.Partitioner = sarama.NewHashPartitioner 规定了 Partitioner 之后,会根据 Key 进行 hash 分区;当 Key 为空时,会进行随机分区;另外,也可以直接设置 msg 的 Partition 字段来确定写入的 partition

消息发送

注意,Kafka 的 broker 端的写入分为内存和磁盘持久化两个部分。其中,磁盘持久化是后台线程异步进行的,默认为 5 秒刷一次盘。这里可能会涉及到 broker 宕机带来的一些一致性的问题

  • sarama.SyncProducer
    • syncProducer.SendMessage() 完成后才会返回
  • sarama.AsyncProducer
    • asyncProducer.Input() <- msg ,可以启动一个后台线程监控 asyncProducer.Sucesses() 通道

在 sarama 库中,SyncProducer 基于 AsyncProducer 实现。需要注意的是,在使用 AsyncProducer,且 config.Producer.Return.Successes 以及 config.Producer.Return.Errors = true 时,需要启动协程来监听 asyncProducer.Successes 以及 asyncProducer.Errors 这两个 chan,否则 chan 可能被填满,导致 chan<- 阻塞

基础的异步生产者例子:

可以通过配置中的 requiredAcks 来配置确认机制,:

  • ack = 0 时表示直接返回成功,不需要确认机制;
  • ack = 1 时表示 leader broker 持久化到本地磁盘后算发送成功(否则重试)
  • ack = all 表示需要所有 in-sync 副本全部确认才算发送成功

注意,ack 的配置会影响重试的场景。在设置了 retries 参数后(默认最大重试次数为 3 ),生产者侧的客户端会因为生产者遇到可恢复错误,如网络超时,分区不可用问题时,自动重试。

当配置 ack = 0 时,broker 未收到消息不会导致重试

func main() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll

    brokers := []string{"localhost:9092"}
    producer, _ := sarama.NewAsyncProducer(brokers, config)
    defer producer.Close()

    // 处理成功和失败的回调
    go func() {
        for {
            select {
            case success := <-producer.Successes():
                log.Printf("Message sent to partition %d at offset %d\n", 
                    success.Partition, success.Offset)
            case err := <-producer.Errors():
                // 可以通过 Metadata 拿到消息标识
            }
        }
    }()

    // 发送消息
    // 这里没有规定 partition,则使用默认的负载均衡策略
    msg := &sarama.ProducerMessage{
        Topic: "topic_name",
        Value: sarama.StringEncoder("msg"),
    }
    producer.Input() <- message
}

关于 partition,上面的例子中没有规定 message key 或者指定写入的分区,则 sarama 客户端会默认使用 RR 的负载均很策略确定写入的 partition,也可以通过规定 key 让 sarama 客户端对 key 取 hash 进行路由

msg := &sarama.ProducerMessage{
    Topic: "topic_name",
    Key:   sarama.StringEncoder("user123"), // 设置 Key
    Value: sarama.StringEncoder("msg"),
}

消费者端

表面上看,sarama 这里使用的是 handler 回调来处理消息,似乎与 Kafka 消费者的拉模型相矛盾,实则不然,sarama 是采用的 for range 监听消息 chan,在每个 / 每组消息到来时主动调用 handler(底层是 go 的 netpoller,使用极少协程非阻塞地监控大量的 socket)

基本实现

sarama 提供了一个消费者组的接口,其定义了三个需要实现的基本方法

type ConsumerGroupHandler interface {
    Setup(ConsumerGroupSession) error
    Cleanup(ConsumerGroupSession) error
    ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

这里的 ConsumerGroupClaim 中的 Message() 方法就是返回一个 channel,使用时可以直接用 for range 去监听这个 channel,拿到消息

手动提交偏移量的例子:

  • 基于 ConsumerGroupSession 提供的 MarkMessage 方法
type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        sess.MarkMessage(msg, "") // 手动提交偏移量
    }
    return nil
}

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V2_6_0_0

    brokers := []string{"localhost:9092"}
    topic := "topic_name"

    // 创建消费者组
    consumer, err := sarama.NewConsumerGroup(brokers, "group_id", config)
    if err != nil {
        log.Fatalf("Error creating consumer group: %v", err)
    }
    defer consumer.Close()

    handler := ConsumerGroupHandler{}
    ctx := context.Background()
    // 消费,将 handler 传入
    err = consumer.Consume(ctx, []string{topic}, handler)
    if err != nil {
        log.Fatalf("Error consuming messages: %v", err)
    }
}

消费者组的 group_id 一般设置为服务名称,一个服务实例作为消费者组的一个消费者;不同服务间的消费相互独立,各自维护 __consumer_offsets 主题下的偏移量

实现重试与确认机制

可以在 ConsumeClaim 方法中(也就是在封装层),去实现重试机制。

在上一个例子的 ConsumerGroupHandler 中,去定义一个 handler 方法,在业务上调用这个 handler 时,可以选择分别返回 true / false,表示业务层面对于消息消费成功的确认与否

type MessageInfo struct {
    Topic string
    Data []byte
    Headers map[string]string
}
type ConsumerHandler func(msg *MessageInfo) (bool, error)

type ConsumerGroupHandler struct {
    handler ConsumerHandler
}

接下来在 ConsumeClaim 中方法中去封装重试逻辑:当 handler 返回错误时进行重试

也就是说,业务层去定义具体的 handler 方法,并根据业务执行的情况选择返回 true / false;在封装层固定地去调用这个 handler,但是此外还加入了重试的逻辑

// 单消息消费者的持续消费
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        m := &MessageInfo{
            Topic: msg.Topic,
            Data: msg.Value,
            Headers: convert(msg.Headers),
        }
        for {
            ack, err := h.handler(msg)
            if ack && err == nil {
                sess.MarkMessage(msg, "") // 确认偏移量
                break
            }
        }
    }
}

其他的实现方案:在消费者端,当消息无法被正常消费时,可以在业务层手动实现重试 + 死信队列的机制,本质上就是定义额外的 retry topic 和 dlq topic

image-20250808224327105

这样做需要考虑消息的顺序性,因为重试主题会打乱原来需要顺序消费的消息处在同一个 partition 的前提