又踩到Dubbo的坑,但是这次我笑不出来原创
前言
直入主题,线上应用发现,偶发性出现如下异常日志。
当然由于线上具体异常包含信息量过大,秉承让肥朝的粉丝没有难调试的代码的原则,我特意抽取了一个复现的demo放在了git,让你不在现场,一样享受到排查的快乐!但是最近,太多假粉伸手党拿到地址就跑,因此我把地址藏在本文某个角落,因此认真看文的才能找到!(重点)
由于工作性质的原因,上班时间根本抽不出时间做其他事,修bug,都只能下班时间来做,因此周六就到公司搬砖了。
什么是ConcurrentModificationException?
中文意思就是,并发修改异常
。也就是我们常说的fail-fast
(快速失败)。当然肥朝更认为,快速失败
是一种思想,比如Spring会在启动的时候做大量的检查,什么bean找不到,依赖注入错误等等,都会把一些显而易见的错误检查出来,防止在项目跑着跑着期间再失败,也就是提前检查。无论是业务开发,还是基础组件开发,亦或是生活中,这个思想都是可以用到的。
那么,言归正传,这个异常到底什么意思啊。简单说就是,当一个集合在遍历的时候,他的元素也正在被修改。刚学java那会,我们边遍历边删除就会出现这个异常。ConcurrentModificationException
的原理这些网上太多,肥朝就暂且不提。那么我们来看下异常栈。
好了,我们已经找到了RpcContext.getContext().getObjectAttachments()
正在遍历。那么,只要找到谁在修改他就行了啊,就这?
难点分析
很明显,这里面并不存在遍历的同时修改元素,Dubbo的代码还不至于有这个明显的bug。出现ConcurrentModificationException
,就有可能是,A线程在遍历,B线程在修改。
但是肥朝,你说了这么多,我还是没发现这个问题有什么难的啊!
这个问题难点主要在于,在Dubbo里面,RpcContext
是对应一个线程的,你可以简单理解为ThreadLocal
的增强版。也就是说,A线程拿出来的,和B线程拿出来的RpcContext
都不是同一个,何来并发修改同一个之说?当然官方文档给了我一个启示。
会不会有同学在线程开启前拿到RpcContext
,然后在新线程中,做set操作(图中的get操作是没有问题的)。
于是,似乎豁然开朗的我,顺着这条线索,周六加了一天班,把代码翻了个遍,最后发现没有找到。
索然无味还是柳暗花明?
并发这东西,要么不出问题,一旦出问题都是很难找。观察了线上日志,重现概率很小,就一小段日志,并且业务方很忙,也没时间配合你查问题。于是只能顺着源码,把Dubbo的整个请求到响应的过程在脑海中快速过几遍,看看哪个环节有可能出问题,做了无数的假设。随着一次次的假设失败,在即将身体索然无味之际,还真发现了一些蛛丝马迹!(注意,本文所用到的,都是dubbo2.7.6)
我们先来看一下官方文档对RpcContext的介绍
好了,那么我问你,下面这段代码,love能输出什么?
@Service
public class AHelloServiceImpl implements AHelloService {
@Reference
private BHelloService bHelloService;
@Override
public String sayHello() throws Exception{
RpcContext.getContext().setAttachment("我最爱的人是?","肥朝");
bHelloService.sayHello();
String love = RpcContext.getContext().getAttachment("我最爱的人是?");
System.out.println("this is: " + love);
Thread.sleep(10L);
bHelloService.sayHello();
return "欢迎关注微信公众号:肥朝";
}
}
我在图都圈得这么明显了,看得懂中文都知道,发起一次远程调用后,参数会被清空,下面肯定get不到的啦。但是其实是get得到的,不要问肥朝为什么都知道图是有问题的,还特意圈起来骗你,我只想让你知道社会险恶。
源码细节
阅读过源码,和对源码有细节深入思考,效果是很大不一样的。
我们来看一下源码就知道了。文中说的会清除,对应的代码是怎么样的呢?
如果作为正常的客户端调用,那么,在调用后确实是会删除的。但是如果你对源码细节足够熟悉你就会发现,在org.apache.dubbo.rpc.filter.ContextFilter
这个类中。
你不看代码直接听我说也行,这几段代码的意思是,在一个提供者的方法中,canRemove会设置为false的,所以,他们在这个方法体远程调用中,是没办法清空RpcContext的,需要在整体调用完才会清空。
我们再回顾一下案发现场
@Override
public String sayHello() throws Exception{
bHelloService.sayHello();
Thread.sleep(10L);
bHelloService.sayHello();
return "欢迎关注微信公众号:肥朝";
}
从目前得到的信息很明显知道,第一次远程调用,和第二次远程调用,用的是同一个RpcContext
,并且,在第二次远程调用的时候。这个RpcContext
的内容,给人动了手脚了。
那么,究竟是何人所为!我们随着镜头,再次深入源码!既然是RpcContext
给人搞了,那么我们就从这里顺藤摸瓜,这里先省略肥朝的内心戏,我们来看重点。在RpcContext
中发现一段可疑片段
public static void restoreContext(RpcContext oldContext) {
LOCAL.set(oldContext);
}
接着继续顺丰摸瓜,发现调用这段代码的逻辑是
/**
* tmp context to use when the thread switch to Dubbo thread.
*/
private RpcContext tmpContext;
private RpcContext tmpServerContext;
private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> {
tmpContext = RpcContext.getContext();
tmpServerContext = RpcContext.getServerContext();
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
};
private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> {
RpcContext.restoreContext(tmpContext);
RpcContext.restoreServerContext(tmpServerContext);
};
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
beforeContext.accept(v, t);
fn.accept(v, t);
afterContext.accept(v, t);
});
return this;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
interceptor.before(next, invocation);
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// onError callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
listener.onError(e, clusterInvoker, invocation);
}
throw e;
} finally {
interceptor.after(next, invocation);
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// onResponse callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
看不懂代码不要怕,肥朝大白话解释一下。你就想象一个Dubbo异步场景,Dubbo异步回调结果的时候,是会开启一个新的线程,那么,这个回调就和当初请求不在一个线程里面了,因此这个回调线程是拿不到当初请求的RpcContext
。但是我们清空RpcContext
是需要在一次请求结束的时候,也就是说,虽然异步回调是另外一个线程了,但是我们仍然需要拿到当初请求时候的RpcContext
来走Filter
,做清空等操作。上面那段代码就是做,切换线程怎么拿回之前的RpcContext
。
听完上面的分析,你是不是明白了点啥?新线程,还能拿到旧的RpcContext
。那么,有这么一个场景,我们在通过提供者方法中,发起两个异步请求,第一个请求走Filter
的onResponse
(响应结果)的时候,我们如果在Filter
做RpcContext.getContext().setAttachment
操作,第二个请求又正好发起,而发起又会经历putAll这步骤,就会出现这个并发修改异常。于是乎,真相大白!
拓展性思考
真相大白就结束了?熟悉肥朝的粉丝都知道,我们遇到问题,要尽量压榨问题的全部价值!比如,你说不要在拦截器中onResponse
方法中用RpcContext.getContext().setAttachment
这样的操作,但是我们确实有类似需要,那到底要怎么写代码又不说,你这样叫我怎么给你转发文章!
我们要知道怎么正确写代码,那直接去抄Dubbo其他拦截器的代码不就知道了?比如
@Activate(group = PROVIDER, order = -10000)
public class ContextFilter implements Filter, Filter.Listener {
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}
}
我们很明显看到,你熟悉一下appResponse
的api和他的作用,就很容易知道,有类似需求,代码应该怎么写了。我光告诉你怎么写代码没用啊,我要告诉你,遇到问题,怎么去抄正确代码,让你任何时候,都有得cao!