问题排查|为啥RocketMQ广播消费每次启动都会从头开始消费?原创
现象
最近听到有项目反馈,使用公司内部封装的消息sdk,在使用RocketMQ广播消费时,每一次版本发布,消费者重启后,都会从最早位点开始消费,造成严重的消息消费积压,不仅对消息服务的负载产生极大影响,也会让发版本后消息出来不及时。
那这是什么原因引起的呢?
问题排查
熟悉RocketMQ的小伙伴应该知道,消费组在重启时优先会去查询上一次的消费位点,只要能查询到有效的位点,则会从查询到的位点开始消费,如果查询不到有效位点,则按照消费组设置的ConsumeFromWhere策略去查询位点,其可选值:
-
CONSUME_FROM_FIRST_OFFSET 从最早位点开始消费
-
CONSUME_FROM_LAST_OFFSET
从最新位点开始消费
-
CONSUME_FROM_TIMESTAMP 从指定时间开始消费
现在的现象是从最早消费,基本可以认为是消费者在启动时并没有查询到有效位点,但RocketMQ广播消费一样会存储消费进度,只是存储在消费端本地,那为什么会读取不到消费进度文件呢?
注意:如果在容器环境中使用RocketMQ的广播消费,进度文件不能存储在容器本身,应该需要引用外部存储文件,即容器创建后,可以统一访问该共享目录,否则肯定会出现本文出现的问题。
为了破解该问题,首先我们要知道RocketMQ广播消费消费进度文件的存储目录,由于本文遇到的环境是虚拟机环境,结合RocketMQ广播消费的核心实现类LocalFileOffsetStore中如下代码:
结合上述两段核心代码,我们可以得知RocketMQ广播消费模式,消息消费进度文件的存储根目录为 /{用户主目录}/.rocketmq_offsets,子目录为/mqClientId/{consumerGroupName}/offsets.json。其中mqClient为消费者clientId,consumerGroupName为消费最名称。
于是立马登录服务器,进入到.rocketmq_offsets文件夹,看到下面存在很多子文件夹,如下图所示:
不进引起了我的注意,按照上述的定义,一个消费者一个客户端拥有自己的专属文件夹,从这里看,显然是消费组在启动时就会创建一个新的文件夹,故加载不到之前存储的消费进度信息,综合来看,问题就出在mqClientId上,也就是这个mqClinetId并不唯一,每一次启动时会变化,最终查看内部代码,发现消费组在构建mqClientID时确实存在问题,因为会取当前时间戳,代码如下:
破案了。那原生使用RocketMQ的广播消费,是否会存在同样的问题呢?答案是不会,因为clientId的默认生成规则如下所示:
也就是默认的mqClientId是IP地址 + @ DEFAULT,这个无论启动多少次,也不会变化。
如果这篇文章对您有所帮助,求一键三连:点赞、转发、评论。关注公众号:「中间件兴趣圈」