性能文章>使用MQ如何保障消息100%被消费>

使用MQ如何保障消息100%被消费转载

2年前
429314

前言:

MQ可通过消息的收发,使多个系统之间不局限于同步调用,通过异步调用更好地实现解耦,流量削峰等。我们平时都在使用MQ,但使用技术框架只是第一步,去弄明白它的底层原理、深挖技术真相,才是每一位IT从业者的基操。

 

正文:

大家好,我是Jensen。


在MQ的整个消息生产消费过程中,如何保障消息100%被消费?


这里说明一点,想要回答好这个的问题,最好还是要有金字塔思维——金字塔思维就是从不同维度上来思考问题的一种方式,不重不漏,集体穷尽。


MQ作为异步通讯的消息中间件,其功能除了解耦生产者与消费者,还能用于大流量的削峰填谷,解决业务的最终一致性问题,那么消息的“可靠性”就显得尤为重要了,比如说商品出库后的库存数据通过MQ同步到财务系统,如果消息的可靠性没有保障,那财务系统的存货成本分析数据就无法有效支撑财务团队。


准确来说,我们需要保障MQ消息的可靠性,需要从三个层面/维度解决:生产者100%投递、MQ持久化、消费者100%消费,这里的100%消费指的是消息不少消费,也不多消费。

AE1BAB44-F424-43F3-9334-FD1C1DF450BE.png



由于MQ是基础网络通讯的中间件,网络通讯必然因丢包、网络抖动等原因产生数据丢失,MQ组件本身也会由于宕机或软件崩溃而中止服务,从而造成数据丢失,那么我们就需要从这两个根本原因着手补偿,这里科普一下RabbitMQ和Kafka是怎么解决的。


PART1:RabbitMQ


这里我必须先提一提RabbitMQ的消息协议——AMQP(Advanced Message Queuing Protocol,高级消息队列协议),在面试时我经常问候选人一个问题:RabbitMQ用的是什么消息协议?大部分候选人是回答不出来AMQP的,更不用说AMQP模型是如何设计的了。

在服务器中,三个主要功能模块连接成一个处理链完成预期的功能:

 

4FE4A753-7108-46A4-BD55-BE0C98BCC2F0.png


1、Exchange:接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到消息队列
2、Queue:存储消息,直到这些消息被消费者安全处理完为止
3、Binding:定义了exchange和queue之间的关联,提供路由规则


使用这个模型我们可以很容易地模拟出存储转发队列和主题订阅这些典型的消息中间件概念。

接下来我们看看RabbitMQ的消息确认机制是如何保障消息可靠性的。


一、生产者端

  • 通过API将信道(channel)设置为confirm模式,则每条消息会被分配一个唯—ID
  • 如果消息投递成功,也就是说消息已经到达broker了,信道会发送ack给生产者,回调ConfirmCallback接口,带上唯一ID
  • 如果发生错误导致消息丢失,比如通过某个RoutingKey无法路由到某个Queue,则会发送nack给生产者,回调ReturnCallback接口,并带上唯一ID和异常信息
  • ack和nack只有一个被触发,只触发一次,而且是异步执行,意味着生产者不需要等待,可以继续发送新消息

二、消费者端


声明队列时,指定noack=false, 表示消费者不会自动提交ack,broker会等待消费者手动返回ack、才会删除消息,否则立刻删除


broker的ack没有超时机制,只会判断链接是否断开,如果断开了(比如消费者处理消息过程中宕机),消息会被重新发送,所以消费者要做好消息幂等性处理


此外,RabbitMQ除了消息确认机制,还有另一种方式——使用事务消息:消息生产端发送commit命令,MQ同步返回commit ok命令,这种方式由于需要同步阻塞等待MQ返回是否投递成功,才能执行别的操作,性能较差,因此不推荐使用。

 

三、MQ本身

通常来说,消息是在内存中存储通讯的,而基于内存的都是会有数据丢失的问题产生,服务一重启,数据就随之销毁。


在RabbitMQ中对数据的持久化有三方面:交换机持久化、队列持久化、消息持久化。


1、交换机持久化:exchange_declare创建交换机时通过参数durable=true指定,如:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);第三个参数就是设置durable值

2、队列持久化:queue_declare创建队列时通过参数durable=true指定,如:channel.queueDeclare("queue.persistent.name", true, false, false, null),第二个参数就是设置durable值

3、消息持久化:new AMPQMessage创建消息时通过参数指定,如:channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes()),或者设置参数deliveryMode=2来指定:AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.deliveryMode(2);

上面只是说了API层的实现,那RabbitMQ底层又是怎么做消息持久化的呢?
如果指定了持久化参数,它们会以append的方式写文件,会根据文件大小(默认16M)自动切割,生成新的文件,RabbitMQ启动时会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(当内存不够时会用到)。


消息存储时,会在一个叫ets的表中记录消息在文件中的映射以及相关信息(包括ID、偏移量、有效数据、左边文件、右边文件),消息读取时根据该信息到文件中读取,同时更新信息。

消息删除时只从ets删除,变为垃圾数据,当垃圾数据超出比例(默认50%),并且文件数达到3个,就会触发垃圾回收:锁定左右两个文件,整理左边文件有效数据、将左边文件有效数据写入左边,更新文件信息,删除右边,完成合并;当一个文件的有效数据等于0时,删除该文件。
写入文件前先写入buffer缓冲区,如果buffer已满,则写入文件,注意,此时只是操作系统的页存,还没落盘。

每隔25ms刷一次磁盘(比如Linux中的fsync命令),不管buffer(fd的读、写缓存区)满没满,都将buffer和页存中的数据落盘。


还有另外一种落盘机制:每次消息写入后,如果没有后续写入请求,则直接刷盘。

 

PART2:Kafka


Kafka在MQ领域以性能高、吞吐能力强、消息堆积能力强等等优势称著,常常用于日志收集、消息系统、用户活动跟踪、运营指标、流式处理等等场景,讲之前先简单聊聊Kafka的架构设计:

6EFF2578-1247-4D20-8FAC-297B400A1D63.png

  • Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,提高消费能力,这是逻辑上的一个订阅者。
  • Topic:可以理解为一个队列,Topic将消息分类,生产者和消费者面向的是同一个Topic。
  • Partition:为了实现扩展性,提高并发能力,一个Topic以多个Partition的方式分布到多个Broker上,每个Partition是一个有序的队列,一个Topic的每个Partition都有若干个副本(Replica),一个Leader和若干个Follower;生产者发送数据的对象,以及消费者消费数据的对象,都是通过Leader,Follower负责实时从Leader中同步数据,保持和Leader数据的同步;当Leader发生故障时,某个Follower还会成为新的Leader。

一、生产者端

 

Kafka消息发送端有个ACK机制。


设置ack参数:ack=0,表示不重试,Kafka不需要返回ack,极有可能各种原因造成丢失;ack=1,表示Leader写入成功就返回ack了,Follower不一定同步成功;ack=all或ack=-1,表示ISR列表中的所有Follower同步完成再返回ack。


设置参数unclean.leader.election.enable: false,禁止选举ISR以外的Follower为Leader,只能从ISR列表中的节点中选举Leader;可能会牺牲Kafka的可用性,但是能够提高消息的可靠性。


重试机制,设置tries > 1,表示消息重发次数。


设置最小同步副本数min.insync.replicas > 1,没满足该值前,Kafka不提供读写服务,写操作会异常。


通过设置最小同步副本数和ACK机制,可以让MQ在性能与可靠性上达到平衡。

二、消费者端


手工提交offset(偏移量):Kafka消费者在拉取消息后,默认会自动提交offset,由于消费者每次都会根据offset来消费消息的,如果消费者处理业务失败,实际上我们是要重新消费的,所以我们要在消息处理成功后再手工提交offset,确认消息能够成功消费。


同样地,消费者的业务代码也要做好幂等性校验。


三、MQ本身


很简单,通过减小broker刷盘间隔来实现高可靠。

要深究其原理,得从Kafka的持久化机制来看。

  • 磁盘的顺序读写:与RabbitMQ不同,Kafka是基于磁盘读写的,那为什么Kafka的吞吐量还这么大呢?原因是Kafka的读写是用顺序读写的,不需要寻址随机读写,而由于是用磁盘来写数据,消息堆积能力必然比内存型的RabbitMQ更强
  • 利用了操作系统的零拷贝技术:避免CPU将数据从一块存储拷贝到另外一块存储,关于零拷贝这里不详述,与Java应用不同,Kafka的消息不需要在用户缓冲区处理磁盘数据再返回,所以才能用零拷贝技术
  • 分区分段+索引:Kafka的消息实际上分布存储在一个一个小的segment中的,每次文件读写也是直接操作segment,为了进一步优化查询,Kafka又默认为分段后的数据文件建立了索引文件(就是文件系统上的.index文件),这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度(类似ConcurrentHashMap的分段锁机制)。
  • 批量压缩&批量读写:多条消息一起压缩进行传输(比如gzip格式)与读写,节省带宽;
  • 直接操作page cache:虽然Kafka是Java写的,也基于JVM运行,但Kafka的消息读写是直接操作操作系统页存的,而不是在JVM的堆内存,这样就避免JVM的GC耗时及对象创建耗时,且读写速度更高,JVM进程重启缓存也不会丢失

理解了Kafka的持久化机制是直接读写页存+定时刷盘的方式,我们只需要设置刷盘策略即可在性能与可靠性上权衡。

Kafka提供3个参数来优化刷盘机制:
log.flush.interval.messages //多少条消息刷盘1次
log.flush.interval.ms //隔多长时间刷盘1次
log.flush.scheduler.interval.ms //周期性的刷盘。


PART3:

总结一下


关于框架类的知识,最重要是得掌握技术框架的底层实现原理、适用场景,其他细节场景会比较多,但起码我们得掌握基本面。

那怎么才算掌握呢?起码能通过框架的特性,根据需要实现一个简易版本,比如说自己实现一个Spring框架、实现一个MQ组件等等。由浅入深,化难为易。

更多思考:

MQ是我们经常使用的中间件,如何解决在工作中遇到的MQ问题,大家可以阅读以下几篇内容:

一次 RocketMQ 顺序消费延迟的问题定位

记一次由 MQ SDK 太简陋引发的生产事故

最后感谢Jensen的分享~

点赞收藏
分类:
嘿小黑
请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

4
1