跳转至

应用场景

解耦

模块 A 需要调用模块 B,但是不需要 B 的回包,即 A 不需要等待 B 把所有任务全部做完。

则 A 只需要将消息传递到一个队列中,B 从队列中取出任务去做。

  1. 可维护性增加:上游只需要关注消息队列的调用是否异常
  2. 可靠性增加:下游挂了也不会影响上游的返回
  3. 响应时间变快:上游对于其调用的想用时间更短(将消息放到队列中之后就返回)

A 不用关心 B 的事情,不再受 B 的影响

削峰

削峰用于突发性的大流量。

A 接收到大量数据后,不需要立刻返回给 B,而是将消息传递到一个中转站,B 按照自身的能力从中拉取数据

实际场景

秒杀

  • 当秒杀的商品数量较少时,Redis 的流量过滤功能可以确保只有库存数量的请求打到 MySQL 上
  • 当商品数量很多,一瞬间打到 MySQL 上,则需要考虑使用消息队列削峰

image-20250214151658218

如果只有一个标注了 @KafkaListener 的接口,那么默认情况下只有一个消费者实例在处理消息。这意味着消息队列中的消息将按顺序处理,即前一条消息处理完成后,才会开始处理下一条消息。

例如使用 TimeUnit.SECONDS.sleep(1); 这行代码来模拟了处理消息的延迟。如果处理每条消息都需要一定的时间,那么后续消息将需要等待前一条消息处理完成后才能被消费。

分发

A 需要发送同一条消息给 B、C、D,B/C/D 只要订阅了相关的主题,就都可以收到这条消息

可以看作群发场景下的解耦方案

消费者:

@KafkaListener(topics = "dispatch", groupId = "group1", concurrency = "1", containerFactory = "kafkaManualAckListenerContainerFactory")
public void dispatchForSvr1(ConsumerRecord<?,?> record, Acknowledgement ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    Optional message = Optional.ofNullable(record.value());
    if(message.isPresent()){
        Object msg = message.get();
        try {
            counterService.incrManyTimes(10000);
            ack.acknowledge();
            log.info(msg);
        } catch(Exception e){
            log.error("fail");
        }
    }
}