性能文章>【全网首发】日常Bug排查-mq不消费>

【全网首发】日常Bug排查-mq不消费原创

2周前
348234

日常Bug排查-mq不消费

日常Bug排查系列都是一些简单Bug排查,笔者将在这里介绍一些排查Bug的简单技巧,同时顺便积累素材^_^。

问题现场

同事反馈应用出现了一个诡异的问题。mq在应用启动10分钟之后就开始堆积。反复重启了几次都是如此。

 

直接jstack

这种问题的分析思路还是比较直接的,直接jstack一下看看mq消费线程在干嘛。

kafka-cosume-thread#topic123# 293 waiting on com.google.common.util.concurrent.SettableFuture@ddb2dcd
java.lang.Thread.State: WAITING
@ sun.misc.Unsafe.park(Native Method)
@ java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
@ com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
@ com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
@ com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:142)
@ com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3661)
@ com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2315)
@ com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2202)
@ com.google.common.cache.LocalCache.get(LocalCache.java:4053)
@ com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
@ com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
@ com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
@ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.业务代码

可以看到都是waiting在google guava cache的get上。翻一下guava cache的源代码:

LocalCache.java
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
.......
// 如果此值已经触发loading动作,则等待loading动作完成
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
.......
}

 

 

进一步分析

那么mq消费线程在等待谁呢?笔者搜索了整个stack,没有找到guavaCache的load函数相关的线程。对应guavaCache的业务代码大致如下

LocalCache.java
private final LoadingCache<String, List<String>> CACHE = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.refreshAfterWrite(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, List<String>>() {
@Override
public List<String> load(@NotNull String key) throws Exception {
return doBusiness(key);
}

@Override
public ListenableFuture<List<String>> reload(@NotNull String key, @NotNull List<String> oldValue) throws Exception {
return listeningExecutorService.submit(() -> {
try {
return doBusiness(key);
} catch (Exception e) {
LogUtil.warn(log, "刷新缓存异常");
return oldValue;
}
});
}
});

而JVM的Stack中没有任何关于doBusiness这个函数的信息。也就是没有任何loading动作在执行。mq消费线程在空等

 

 

 

分析源代码

可以看到上面那段guavaCache的expire超时时间正好是10分钟,而这个10分钟和应用启动后10分钟无响应时间一致。由此可见,一旦expire了就会导致直接卡死。而应用刚启动首次刷新的时候触发getCache确不会,这个事情有点诡异。仔细观察上面那段代码,refreshAfterWrite的超时时间是5分钟,如果按照正常逻辑,应该不会在10分钟的时候才开始刷新。由此可见refresh的时候也卡住了。而refresh是采用线程池的,可以继续从jstack中看看refresh的线程池在做什么。

"common-pool-thread-9" #XXX waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4b709279
java.lang.Thread.State: TIMED_WAITING
@ sun.misc.Unsafe.park(Native Method)
@ java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
@ java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
@ java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
@ XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX.run(XXXTask.java)
@ java.lang.Thread.run(Thread.java:748)

而这段XXXTask是个无限循环

class XXXTask {
void run(){
while(running){
// do something
}
}
}

笔者看到这里就直接明白了。guavaCache的refreshWrite和xxxTask共用了一个线程池。xxxTask是个无限循环,将线程池的所有线程占住。这样guavaCache的refreshWrite就永远躺在线程池的blockingQueue里面没有任何运行的机会。如下图所示:

 

为什么一开始没有卡住

因为一开始在没有任何Cache时候,加载动作是在用户线程里的。因为当时还没有任何其它的加载任务,也就是valueReference.isLoading = false

LocalCache.java
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
.......
// 这边是false,不走这个分支
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
.......
// 在mq消费线程触发
return lockedGetOrLoad(key, hash, loader);
}

这时候再mq线程就执行了doGetCache动作,所以不会被其它任务抢占。而由于5min之后触发了线程的refresh,这时候valueReference.isLoading变为了true。并提交了reloadTask到线程池。
但线程池又被其它任务占满,导致valueRefercence.isLoading一直为true。

 


由于没有达到expire时间,所以这时候一直返回旧值。

在设置的expire时间之后,由于这时旧值已经过期,又由于valueReference.isLoading,所以mq线程(也就是getCache的线程一直卡住)。

对应的代码如下所示:

 

LocalCache.java
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
.......
if ( e != null) {
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
// 未expire之前返回旧值
recordRead(e, now);
statsCounter.recordHits(1);
// 到refresh时间往线程池提交任务,设置valueReference.isLoading=true,并返回旧值
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
// expire了之后等待refresh的reloadTask返回,一直被block住
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
......

关于guavaCache的reload

事实上,业务系统中关于guavaCache的写法中除了公用一个线程池之外,其load和reload的处理还是非常值得借鉴的。如果不重载reload,那么假设后端存储挂了,在缓存expire之后,业务系统就再也无法获取到缓存的内容。而reload的重载可以在后端存储中返回旧值,这样能够有效的避免了后端存储挂了之后的容错问题。但是,由于对于隔离性的考虑不足进而踩了这个坑。

总结

我们还是需要对各种资源(尤其是线程这样宝贵的资源)做好隔离。这样才能有效的避免其它任务的干扰。遇到类似问题时,从线程stack入手是个非常好的排查方向。

 

点赞收藏
分类:标签:
巡山小汪

关注微信公众号《解Bug之路》,有问题请在公众号中咨询:) 无论多么艰苦的时刻,都不要忘记,辉煌的未来,在你的眼中闪耀!

请先登录,查看3条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

对 Pulsar 集群的压测与优化

对 Pulsar 集群的压测与优化

【全网首发】MQ系列11:如何保证消息可靠性传输

【全网首发】MQ系列11:如何保证消息可靠性传输

RabbitMQ、RocketMQ、Kafka性能为何差距如此大?

RabbitMQ、RocketMQ、Kafka性能为何差距如此大?

Topic太多!RocketMQ炸了!

Topic太多!RocketMQ炸了!

4
3