MQ如何选型
MQ | 描述 |
---|---|
RabbitMQ | erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。 |
RocketMQ | java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。 |
Kafka | Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。 |
ActiveMQ | java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。 |
为什么要使用MQ
因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了MQ。
作用 | 描述 |
---|---|
解耦 | 系统耦合度降低,没有强依赖关系 |
异步 | 不需要同步执行的远程调用可以有效提高响应时间 |
削峰 | 请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮 |
PS:想要达到削峰的目的,需要将消息队列消费时的推送模式改为拉取模式。
RocketMQ中的Topic和JMS的queue
queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。
消费后会立即删除
RocketMQ Broker中的消息被消费后不会立即删除,每条消息都会持久化到CommitLog中,每个Consumer 连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。 ??
RocketMQ消费模式有几种
消费模型由Consumer决定,消费维度为Topic:
- 集群消费
- 一条消息只会被同Group中的一个Consumer消费
- 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
- 广播消费
- 消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
消费消息是push还是pull
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式。 源码如下:
// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。
// 拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);
pull的方式可以根据当前consumer消费的情况来pull,不会造成过多的压力而造成瓶颈。
消息顺序
一个RocketMQ服务内部会有多个queue,单个queue里的顺序是典型的FIFO,但多个queue同时消费时 无法保证消息的有序性的。所以,只能通过同一topic、同一个queue来实现。也就是,发消息的时候用 一个线程去发送消息,消费的时候一个线程去消费这个queue里的消息。
那怎么实现同一个线程是发送消息和同一个线程去消费呢?
Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法, 举个最简单的例子:判断 i % 2 == 0
,那就都放到queue1里,否则放到queue2里。
消息丢失
首先在如下三个部分都可能会出现丢失消息的情况:
- Producer端
- Broker端
- Consumer端
Producer端
- 采取send()同步发消息,发送结果是同步感知的。
- 发送失败后可以重试,设置重试次数。默认3次。
- 集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。
Broker端
- 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
- 集群部署,主从模式,高可用。
Consumer端
- 完全消费正常后在进行手动ack确认。
消息重复消费
原因
影响消息正常发送和消费的重要原因是网络的不确定性
ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除。 当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer。
消费模式
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次
解决方案
总的策略就是考虑并发状态下的幂等处理。
数据库表
处理消息前,使用消息主键在表中带有约束的字段中insert
Map
单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache
Redis
分布式锁搞起来
消息堆积
- 查找原因定位问题,是Producer太多了还是Consumer太少了
- 通过上线更多consumer临时解决消息堆积问题
堆积消息太多
- 一个临时的topic,queue的数量是堆积的几倍,queue分布到多Broker中
- 上线一台Consumer做消息的搬运工
- 上线N台Consumer同时消费临时Topic中的数据
- 改bug
- 恢复原来的Consumer,继续消费之前的Topic
堆积时间过长消息超时
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
堆积的消息会不会进死信队列
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),默认18次,才会进入死信队列(%DLQ%+ConsumerGroup)。
MQ高可用
Producer端高可用
创建topic时,把topic的多个message queue创建在多个broker组上。这样当一个broker组的master不可用后, producer仍然可以给其他组的master发送消息。 rocketmq目前还不支持主从切换,需要手动切换
Broker端高可用
ocketmq是通过broker主从机制来实现高可用的。相同broker名称,不同brokerid的机器组成一个broker组, brokerId=0表明这个broker是master,brokerId>0表明这个broker是slave。
Consumer端高可用
consumer并不能配置从master读还是slave读。当master不可用或者繁忙的时候consumer会被 自动切换到从slave读。这样当master出现故障后,consumer仍然可以从slave读,保证了消息消费的高可用
负载均衡
Producer端负载均衡
producer在发送消息时,默认轮询所有queue,消息就会被发送到不同的queue上。而queue可以分布在不同的broker上
Consumer端负载均衡
默认的分配算法是AllocateMessageQueueAveragely,每个consumer实例平均分配每个consumer queue。 如果consumer数量比message queue还多,则多会来的consumer会被闲置。
刷盘机制
同步刷盘
消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。吞吐量低,但不会造成消息丢失
异步刷盘
消息被写入内存的PAGECACHE,返回写成功状态,当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 。吞吐量高,当磁盘损坏时,会丢失消息
主从复制
如果一个broker有master和slave时,就需要将master上的消息复制到slave上,复制的方式有两种
同步复制
master和slave均写成功,才返回客户端成功。maste挂了以后可以保证数据不丢失,但是同步复制会增加数据写入延迟,降低吞吐量
异步复制
master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,但是当master出现故障后,有可能造成数据丢失
参考资料
文档信息
- 本文作者:Bob.Zhu
- 本文链接:https://adolphor.github.io/2021/09/13/02-rocketmq-qa/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)