Kafka 确认机制
总览¶
Kafka 的消息确认机制可以分为生产者端和消费者端两个方面:
机制 | 生产者端 | 消费者端 |
---|---|---|
确认目标 | Broker 确认消息持久化 | 消费者确认消息已处理 |
实现方式 | acks=0/1/all |
偏移量提交(自动/手动) |
可靠性影响 | 保证消息不丢失 | 保证消息不重复/不丢失 |
生产者端¶
注意,Kafka 的 broker 端的写入分为内存和磁盘持久化两个部分。其中,磁盘持久化是后台线程异步进行的,默认为 5 秒刷一次盘。这里可能会涉及到 broker 宕机带来的一些一致性的问题
- sarama.SyncProducer
- syncProducer.SendMessage() 完成后才会返回
- sarama.AsyncProducer
- asyncProducer.Input() <- msg ,可以启动一个后台线程监控 asyncProducer.Sucesses() 通道
基础的异步生产者例子:
可以通过配置中的 requiredAcks 来配置确认机制,:
- ack = 0 时表示直接返回成功,不需要确认机制;
- ack = 1 时表示 leader broker 持久化到本地磁盘后算发送成功(否则重试)
- ack = all 表示需要所有 in-sync 副本全部确认才算发送成功
注意,ack 的配置会影响重试的场景。在设置了 retries 参数后(默认最大重试次数为 3 ),生产者侧的客户端会因为生产者遇到可恢复错误,如网络超时,分区不可用问题时,自动重试。
当配置 ack = 0 时,broker 未收到消息不会导致重试
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:9092"}
producer, _ := sarama.NewAsyncProducer(brokers, config)
defer producer.Close()
// 处理成功和失败的回调
go func() {
for {
select {
case success := <-producer.Successes():
log.Printf("Message sent to partition %d at offset %d\n",
success.Partition, success.Offset)
case err := <-producer.Errors():
// 可以通过 Metadata 拿到消息标识
}
}
}()
// 发送消息
// 这里没有规定 partition,则使用默认的负载均衡策略
msg := &sarama.ProducerMessage{
Topic: "topic_name",
Value: sarama.StringEncoder("msg"),
}
producer.Input() <- message
}
关于 partition,上面的例子中没有规定 message key 或者指定写入的分区,则 sarama 客户端会默认使用 RR 的负载均很策略确定写入的 partition,也可以通过规定 key 让 sarama 客户端对 key 取 hash 进行路由
msg := &sarama.ProducerMessage{
Topic: "topic_name",
Key: sarama.StringEncoder("user123"), // 设置 Key
Value: sarama.StringEncoder("msg"),
}
消费者端¶
sarama 提供了一个消费者组的接口,其定义了三个需要实现的基本方法
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
这里的 ConsumerGroupClaim 中的 Message() 方法就是返回一个 channel,使用时可以直接用 for range 去监听这个 channel,拿到消息
手动提交偏移量的例子:
- 基于 ConsumerGroupSession 提供的 MarkMessage 方法
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
sess.MarkMessage(msg, "") // 手动提交偏移量
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_6_0_0
brokers := []string{"localhost:9092"}
topic := "topic_name"
// 创建消费者组
consumer, err := sarama.NewConsumerGroup(brokers, "group_id", config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumer.Close()
handler := ConsumerGroupHandler{}
ctx := context.Background()
// 消费,将 handler 传入
err = consumer.Consume(ctx, []string{topic}, handler)
if err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
}
消费者组的 group_id 一般设置为服务名称,一个服务实例作为消费者组的一个消费者;不同服务间的消费相互独立,各自维护 __consumer_offsets 主题下的偏移量
特别地,在消费者端,当消息无法被正常消费时,可以在业务层手动实现重试 + 死信队列的机制,本质上就是定义额外的 retry topic 和 dlq topic
这样做需要考虑消息的顺序性,因为重试主题会打乱原来需要顺序消费的消息处在同一个 partition 的前提