应用场景
解耦¶
模块 A 需要调用模块 B,但是不需要 B 的回包,即 A 不需要等待 B 把所有任务全部做完。
则 A 只需要将消息传递到一个队列中,B 从队列中取出任务去做。
- 可维护性增加:上游只需要关注消息队列的调用是否异常
- 可靠性增加:下游挂了也不会影响上游的返回
- 响应时间变快:上游对于其调用的想用时间更短(将消息放到队列中之后就返回)
A 不用关心 B 的事情,不再受 B 的影响
削峰¶
削峰用于突发性的大流量。
A 接收到大量数据后,不需要立刻返回给 B,而是将消息传递到一个中转站,B 按照自身的能力从中拉取数据
实际场景¶
秒杀¶
- 当秒杀的商品数量较少时,Redis 的流量过滤功能可以确保只有库存数量的请求打到 MySQL 上
- 当商品数量很多,一瞬间打到 MySQL 上,则需要考虑使用消息队列削峰
如果只有一个标注了
@KafkaListener
的接口,那么默认情况下只有一个消费者实例在处理消息。这意味着消息队列中的消息将按顺序处理,即前一条消息处理完成后,才会开始处理下一条消息。例如使用
TimeUnit.SECONDS.sleep(1);
这行代码来模拟了处理消息的延迟。如果处理每条消息都需要一定的时间,那么后续消息将需要等待前一条消息处理完成后才能被消费。
分发¶
A 需要发送同一条消息给 B、C、D,B/C/D 只要订阅了相关的主题,就都可以收到这条消息
可以看作群发场景下的解耦方案
消费者:
@KafkaListener(topics = "dispatch", groupId = "group1", concurrency = "1", containerFactory = "kafkaManualAckListenerContainerFactory")
public void dispatchForSvr1(ConsumerRecord<?,?> record, Acknowledgement ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if(message.isPresent()){
Object msg = message.get();
try {
counterService.incrManyTimes(10000);
ack.acknowledge();
log.info(msg);
} catch(Exception e){
log.error("fail");
}
}
}