RocketMQ Q&A

2021/09/13 RocketMQ 共 3517 字,约 11 分钟
Bob.Zhu

MQ如何选型

MQ描述
RabbitMQerlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。
RocketMQjava开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。
KafkaScala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
ActiveMQjava开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

消息中间件对比

为什么要使用MQ

因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了MQ。

作用描述
解耦系统耦合度降低,没有强依赖关系
异步不需要同步执行的远程调用可以有效提高响应时间
削峰请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮

PS:想要达到削峰的目的,需要将消息队列消费时的推送模式改为拉取模式。

RocketMQ中的Topic和JMS的queue

queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。

消费后会立即删除

RocketMQ Broker中的消息被消费后不会立即删除,每条消息都会持久化到CommitLog中,每个Consumer 连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。 ??

RocketMQ消费模式有几种

消费模型由Consumer决定,消费维度为Topic:

  • 集群消费
    • 一条消息只会被同Group中的一个Consumer消费
    • 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
  • 广播消费
    • 消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

消费消息是push还是pull

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式。 源码如下:

// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。

// 拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);

pull的方式可以根据当前consumer消费的情况来pull,不会造成过多的压力而造成瓶颈。

消息顺序

一个RocketMQ服务内部会有多个queue,单个queue里的顺序是典型的FIFO,但多个queue同时消费时 无法保证消息的有序性的。所以,只能通过同一topic、同一个queue来实现。也就是,发消息的时候用 一个线程去发送消息,消费的时候一个线程去消费这个queue里的消息。

那怎么实现同一个线程是发送消息和同一个线程去消费呢?

Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法, 举个最简单的例子:判断 i % 2 == 0,那就都放到queue1里,否则放到queue2里。

消息丢失

首先在如下三个部分都可能会出现丢失消息的情况:

  • Producer端
  • Broker端
  • Consumer端

Producer端

  • 采取send()同步发消息,发送结果是同步感知的。
  • 发送失败后可以重试,设置重试次数。默认3次。
  • 集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。

Broker端

  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
  • 集群部署,主从模式,高可用。

Consumer端

  • 完全消费正常后在进行手动ack确认。

消息重复消费

原因

影响消息正常发送和消费的重要原因是网络的不确定性

ACK

正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除。 当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer。

消费模式

在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次

解决方案

总的策略就是考虑并发状态下的幂等处理。

数据库表

处理消息前,使用消息主键在表中带有约束的字段中insert

Map

单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache

Redis

分布式锁搞起来

消息堆积

  • 查找原因定位问题,是Producer太多了还是Consumer太少了
  • 通过上线更多consumer临时解决消息堆积问题

堆积消息太多

  • 一个临时的topic,queue的数量是堆积的几倍,queue分布到多Broker中
  • 上线一台Consumer做消息的搬运工
  • 上线N台Consumer同时消费临时Topic中的数据
  • 改bug
  • 恢复原来的Consumer,继续消费之前的Topic

堆积时间过长消息超时

RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。

堆积的消息会不会进死信队列

不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),默认18次,才会进入死信队列(%DLQ%+ConsumerGroup)。

MQ高可用

Producer端高可用

创建topic时,把topic的多个message queue创建在多个broker组上。这样当一个broker组的master不可用后, producer仍然可以给其他组的master发送消息。 rocketmq目前还不支持主从切换,需要手动切换

Broker端高可用

ocketmq是通过broker主从机制来实现高可用的。相同broker名称,不同brokerid的机器组成一个broker组, brokerId=0表明这个broker是master,brokerId>0表明这个broker是slave。

Consumer端高可用

consumer并不能配置从master读还是slave读。当master不可用或者繁忙的时候consumer会被 自动切换到从slave读。这样当master出现故障后,consumer仍然可以从slave读,保证了消息消费的高可用

负载均衡

Producer端负载均衡

producer在发送消息时,默认轮询所有queue,消息就会被发送到不同的queue上。而queue可以分布在不同的broker上

Consumer端负载均衡

默认的分配算法是AllocateMessageQueueAveragely,每个consumer实例平均分配每个consumer queue。 如果consumer数量比message queue还多,则多会来的consumer会被闲置。

刷盘机制

同步刷盘

消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。吞吐量低,但不会造成消息丢失

异步刷盘

消息被写入内存的PAGECACHE,返回写成功状态,当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 。吞吐量高,当磁盘损坏时,会丢失消息

主从复制

如果一个broker有master和slave时,就需要将master上的消息复制到slave上,复制的方式有两种

同步复制

master和slave均写成功,才返回客户端成功。maste挂了以后可以保证数据不丢失,但是同步复制会增加数据写入延迟,降低吞吐量

异步复制

master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,但是当master出现故障后,有可能造成数据丢失

参考资料

文档信息

Search

    Table of Contents