性能文章>问题排查|为啥RocketMQ广播消费每次启动都会从头开始消费?>

问题排查|为啥RocketMQ广播消费每次启动都会从头开始消费?原创

2年前
334012

现象

最近听到有项目反馈,使用公司内部封装的消息sdk,在使用RocketMQ广播消费时,每一次版本发布,消费者重启后,都会从最早位点开始消费,造成严重的消息消费积压,不仅对消息服务的负载产生极大影响,也会让发版本后消息出来不及时。

那这是什么原因引起的呢?

问题排查

熟悉RocketMQ的小伙伴应该知道,消费组在重启时优先会去查询上一次的消费位点,只要能查询到有效的位点,则会从查询到的位点开始消费,如果查询不到有效位点,则按照消费组设置的ConsumeFromWhere策略去查询位点,其可选值:

  • CONSUME_FROM_FIRST_OFFSET 从最早位点开始消费

  • CONSUME_FROM_LAST_OFFSET

    从最新位点开始消费

  • CONSUME_FROM_TIMESTAMP 从指定时间开始消费

现在的现象是从最早消费,基本可以认为是消费者在启动时并没有查询到有效位点,但RocketMQ广播消费一样会存储消费进度,只是存储在消费端本地,那为什么会读取不到消费进度文件呢?

注意:如果在容器环境中使用RocketMQ的广播消费,进度文件不能存储在容器本身,应该需要引用外部存储文件,即容器创建后,可以统一访问该共享目录,否则肯定会出现本文出现的问题。

为了破解该问题,首先我们要知道RocketMQ广播消费消费进度文件的存储目录,由于本文遇到的环境是虚拟机环境,结合RocketMQ广播消费的核心实现类LocalFileOffsetStore中如下代码:

结合上述两段核心代码,我们可以得知RocketMQ广播消费模式,消息消费进度文件的存储根目录为 /{用户主目录}/.rocketmq_offsets,子目录为/mqClientId/{consumerGroupName}/offsets.json。其中mqClient为消费者clientId,consumerGroupName为消费最名称。

于是立马登录服务器,进入到.rocketmq_offsets文件夹,看到下面存在很多子文件夹,如下图所示:

不进引起了我的注意,按照上述的定义,一个消费者一个客户端拥有自己的专属文件夹,从这里看,显然是消费组在启动时就会创建一个新的文件夹,故加载不到之前存储的消费进度信息,综合来看,问题就出在mqClientId上,也就是这个mqClinetId并不唯一,每一次启动时会变化,最终查看内部代码,发现消费组在构建mqClientID时确实存在问题,因为会取当前时间戳,代码如下:

破案了。那原生使用RocketMQ的广播消费,是否会存在同样的问题呢?答案是不会,因为clientId的默认生成规则如下所示:

也就是默认的mqClientId是IP地址 +  @ DEFAULT,这个无论启动多少次,也不会变化。

助,、评论

 

点赞收藏
丁威
请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

2
1