Kafka 确认机制
总览¶
Kafka 的消息确认机制可以分为生产者端和消费者端两个方面:
机制 | 生产者端 | 消费者端 |
---|---|---|
确认目标 | Broker 确认消息持久化 | 消费者确认消息已处理 |
实现方式 | acks=0/1/all |
偏移量提交(自动/手动) |
可靠性影响 | 保证消息不丢失 | 保证消息不重复/不丢失 |
生产者端¶
消息构造¶
sarama 客户端提供了消息格式:sarama.ProducerMessage,其可导出字段如下:
type ProducerMessage struct {
Topic string // The Kafka topic for this message.
// The partitioning key for this message. Pre-existing Encoders include
// StringEncoder and ByteEncoder.
Key Encoder
// The actual message to store in Kafka. Pre-existing Encoders include
// StringEncoder and ByteEncoder.
Value Encoder
// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []RecordHeader
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
// pass-through data.
Metadata interface{}
// Below this point are filled in by the producer as the message is processed
// Offset is the offset of the message stored on the broker. This is only
// guaranteed to be defined if the message was successfully delivered and
// RequiredAcks is not NoResponse.
Offset int64
// Partition is the partition that the message was sent to. This is only
// guaranteed to be defined if the message was successfully delivered.
Partition int32
// Timestamp can vary in behaviour depending on broker configuration, being
// in either one of the CreateTime or LogAppendTime modes (default CreateTime),
// and requiring version at least 0.10.0.
//
// When configured to CreateTime, the timestamp is specified by the producer
// either by explicitly setting this field, or when the message is added
// to a produce set.
//
// When configured to LogAppendTime, the timestamp assigned to the message
// by the broker. This is only guaranteed to be defined if the message was
// successfully delivered and RequiredAcks is not NoResponse.
Timestamp time.Time
}
其中,Topic 和 Value 是必须规定的
当使用 config.Producer.Partitioner = sarama.NewHashPartitioner 规定了 Partitioner 之后,会根据 Key 进行 hash 分区;当 Key 为空时,会进行随机分区;另外,也可以直接设置 msg 的 Partition 字段来确定写入的 partition
消息发送¶
注意,Kafka 的 broker 端的写入分为内存和磁盘持久化两个部分。其中,磁盘持久化是后台线程异步进行的,默认为 5 秒刷一次盘。这里可能会涉及到 broker 宕机带来的一些一致性的问题
- sarama.SyncProducer
- syncProducer.SendMessage() 完成后才会返回
- sarama.AsyncProducer
- asyncProducer.Input() <- msg ,可以启动一个后台线程监控 asyncProducer.Sucesses() 通道
在 sarama 库中,SyncProducer 基于 AsyncProducer 实现。需要注意的是,在使用 AsyncProducer,且 config.Producer.Return.Successes 以及 config.Producer.Return.Errors = true 时,需要启动协程来监听 asyncProducer.Successes 以及 asyncProducer.Errors 这两个 chan,否则 chan 可能被填满,导致 chan<- 阻塞
基础的异步生产者例子:
可以通过配置中的 requiredAcks 来配置确认机制,:
- ack = 0 时表示直接返回成功,不需要确认机制;
- ack = 1 时表示 leader broker 持久化到本地磁盘后算发送成功(否则重试)
- ack = all 表示需要所有 in-sync 副本全部确认才算发送成功
注意,ack 的配置会影响重试的场景。在设置了 retries 参数后(默认最大重试次数为 3 ),生产者侧的客户端会因为生产者遇到可恢复错误,如网络超时,分区不可用问题时,自动重试。
当配置 ack = 0 时,broker 未收到消息不会导致重试
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:9092"}
producer, _ := sarama.NewAsyncProducer(brokers, config)
defer producer.Close()
// 处理成功和失败的回调
go func() {
for {
select {
case success := <-producer.Successes():
log.Printf("Message sent to partition %d at offset %d\n",
success.Partition, success.Offset)
case err := <-producer.Errors():
// 可以通过 Metadata 拿到消息标识
}
}
}()
// 发送消息
// 这里没有规定 partition,则使用默认的负载均衡策略
msg := &sarama.ProducerMessage{
Topic: "topic_name",
Value: sarama.StringEncoder("msg"),
}
producer.Input() <- message
}
关于 partition,上面的例子中没有规定 message key 或者指定写入的分区,则 sarama 客户端会默认使用 RR 的负载均很策略确定写入的 partition,也可以通过规定 key 让 sarama 客户端对 key 取 hash 进行路由
msg := &sarama.ProducerMessage{
Topic: "topic_name",
Key: sarama.StringEncoder("user123"), // 设置 Key
Value: sarama.StringEncoder("msg"),
}
消费者端¶
表面上看,sarama 这里使用的是 handler 回调来处理消息,似乎与 Kafka 消费者的拉模型相矛盾,实则不然,sarama 是采用的 for range 监听消息 chan,在每个 / 每组消息到来时主动调用 handler(底层是 go 的 netpoller,使用极少协程非阻塞地监控大量的 socket)
基本实现¶
sarama 提供了一个消费者组的接口,其定义了三个需要实现的基本方法
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
这里的 ConsumerGroupClaim 中的 Message() 方法就是返回一个 channel,使用时可以直接用 for range 去监听这个 channel,拿到消息
手动提交偏移量的例子:
- 基于 ConsumerGroupSession 提供的 MarkMessage 方法
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
sess.MarkMessage(msg, "") // 手动提交偏移量
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_6_0_0
brokers := []string{"localhost:9092"}
topic := "topic_name"
// 创建消费者组
consumer, err := sarama.NewConsumerGroup(brokers, "group_id", config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumer.Close()
handler := ConsumerGroupHandler{}
ctx := context.Background()
// 消费,将 handler 传入
err = consumer.Consume(ctx, []string{topic}, handler)
if err != nil {
log.Fatalf("Error consuming messages: %v", err)
}
}
消费者组的 group_id 一般设置为服务名称,一个服务实例作为消费者组的一个消费者;不同服务间的消费相互独立,各自维护 __consumer_offsets 主题下的偏移量
实现重试与确认机制¶
可以在 ConsumeClaim 方法中(也就是在封装层),去实现重试机制。
在上一个例子的 ConsumerGroupHandler 中,去定义一个 handler 方法,在业务上调用这个 handler 时,可以选择分别返回 true / false,表示业务层面对于消息消费成功的确认与否
type MessageInfo struct {
Topic string
Data []byte
Headers map[string]string
}
type ConsumerHandler func(msg *MessageInfo) (bool, error)
type ConsumerGroupHandler struct {
handler ConsumerHandler
}
接下来在 ConsumeClaim 中方法中去封装重试逻辑:当 handler 返回错误时进行重试
也就是说,业务层去定义具体的 handler 方法,并根据业务执行的情况选择返回 true / false;在封装层固定地去调用这个 handler,但是此外还加入了重试的逻辑
// 单消息消费者的持续消费
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
m := &MessageInfo{
Topic: msg.Topic,
Data: msg.Value,
Headers: convert(msg.Headers),
}
for {
ack, err := h.handler(msg)
if ack && err == nil {
sess.MarkMessage(msg, "") // 确认偏移量
break
}
}
}
}
其他的实现方案:在消费者端,当消息无法被正常消费时,可以在业务层手动实现重试 + 死信队列的机制,本质上就是定义额外的 retry topic 和 dlq topic
这样做需要考虑消息的顺序性,因为重试主题会打乱原来需要顺序消费的消息处在同一个 partition 的前提