跳转至

Kafka 确认机制

总览

Kafka 的消息确认机制可以分为生产者端和消费者端两个方面:

机制 生产者端 消费者端
确认目标 Broker 确认消息持久化 消费者确认消息已处理
实现方式 acks=0/1/all 偏移量提交(自动/手动)
可靠性影响 保证消息不丢失 保证消息不重复/不丢失

生产者端

注意,Kafka 的 broker 端的写入分为内存和磁盘持久化两个部分。其中,磁盘持久化是后台线程异步进行的,默认为 5 秒刷一次盘。这里可能会涉及到 broker 宕机带来的一些一致性的问题

  • sarama.SyncProducer
    • syncProducer.SendMessage() 完成后才会返回
  • sarama.AsyncProducer
    • asyncProducer.Input() <- msg ,可以启动一个后台线程监控 asyncProducer.Sucesses() 通道

基础的异步生产者例子:

可以通过配置中的 requiredAcks 来配置确认机制,:

  • ack = 0 时表示直接返回成功,不需要确认机制;
  • ack = 1 时表示 leader broker 持久化到本地磁盘后算发送成功(否则重试)
  • ack = all 表示需要所有 in-sync 副本全部确认才算发送成功

注意,ack 的配置会影响重试的场景。在设置了 retries 参数后(默认最大重试次数为 3 ),生产者侧的客户端会因为生产者遇到可恢复错误,如网络超时,分区不可用问题时,自动重试。

当配置 ack = 0 时,broker 未收到消息不会导致重试

func main() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll

    brokers := []string{"localhost:9092"}
    producer, _ := sarama.NewAsyncProducer(brokers, config)
    defer producer.Close()

    // 处理成功和失败的回调
    go func() {
        for {
            select {
            case success := <-producer.Successes():
                log.Printf("Message sent to partition %d at offset %d\n", 
                    success.Partition, success.Offset)
            case err := <-producer.Errors():
                // 可以通过 Metadata 拿到消息标识
            }
        }
    }()

    // 发送消息
    // 这里没有规定 partition,则使用默认的负载均衡策略
    msg := &sarama.ProducerMessage{
        Topic: "topic_name",
        Value: sarama.StringEncoder("msg"),
    }
    producer.Input() <- message
}

关于 partition,上面的例子中没有规定 message key 或者指定写入的分区,则 sarama 客户端会默认使用 RR 的负载均很策略确定写入的 partition,也可以通过规定 key 让 sarama 客户端对 key 取 hash 进行路由

msg := &sarama.ProducerMessage{
    Topic: "topic_name",
    Key:   sarama.StringEncoder("user123"), // 设置 Key
    Value: sarama.StringEncoder("msg"),
}

消费者端

sarama 提供了一个消费者组的接口,其定义了三个需要实现的基本方法

type ConsumerGroupHandler interface {
    Setup(ConsumerGroupSession) error
    Cleanup(ConsumerGroupSession) error
    ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

这里的 ConsumerGroupClaim 中的 Message() 方法就是返回一个 channel,使用时可以直接用 for range 去监听这个 channel,拿到消息

手动提交偏移量的例子:

  • 基于 ConsumerGroupSession 提供的 MarkMessage 方法
type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        sess.MarkMessage(msg, "") // 手动提交偏移量
    }
    return nil
}

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V2_6_0_0

    brokers := []string{"localhost:9092"}
    topic := "topic_name"

    // 创建消费者组
    consumer, err := sarama.NewConsumerGroup(brokers, "group_id", config)
    if err != nil {
        log.Fatalf("Error creating consumer group: %v", err)
    }
    defer consumer.Close()

    handler := ConsumerGroupHandler{}
    ctx := context.Background()
    // 消费,将 handler 传入
    err = consumer.Consume(ctx, []string{topic}, handler)
    if err != nil {
        log.Fatalf("Error consuming messages: %v", err)
    }
}

消费者组的 group_id 一般设置为服务名称,一个服务实例作为消费者组的一个消费者;不同服务间的消费相互独立,各自维护 __consumer_offsets 主题下的偏移量

特别地,在消费者端,当消息无法被正常消费时,可以在业务层手动实现重试 + 死信队列的机制,本质上就是定义额外的 retry topic 和 dlq topic

image-20250808224327105

这样做需要考虑消息的顺序性,因为重试主题会打乱原来需要顺序消费的消息处在同一个 partition 的前提