性能文章>【全网首发】Java 技术栈中间件优雅停机方案设计与实现全景图>

【全网首发】Java 技术栈中间件优雅停机方案设计与实现全景图原创

585816

本系列 Netty 源码解析文章基于  4.1.56.Final 版本

本文概要

在上篇文章 我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景 中笔者为大家详细介绍了 Netty 在处理连接关闭时的完整过程,并详细介绍了 Netty 如何应对 TCP 连接在关闭时会遇到的各种场景。

在连接关闭之后,接下来就轮到 Netty 的谢幕时刻了,本文笔者会为大家详尽 Java 技术栈中间件中关于优雅停机方案的详细设计和实现。

笔者会从日常开发工作中常见的版本发布,服务上下线的场景聊起,引出服务优雅启停的需求,并从这个需求出发,一步一步带大家探究各个中间件里的优雅停机的相关设计。

熟悉笔者文风的读者朋友应该知道,笔者肯定不会只是简单的介绍,要么不讲,要讲就要把整个技术体系的前世今生给大家讲清楚,讲明白。

基于这目的,笔者会先从支持优雅停机的底层技术基石--内核中的信号量开始聊起。

image.png

从内核层我们接着会聊到 JVM 层,在 JVM 层一探优雅停机底层的技术玄机。

image.png

随后我们会从 JVM 层一路奔袭到 Spring 然后到 Dubbo。在这个过程中,笔者还会带大家一起 Shooting Dubbo 在优雅停机下的一个 Bug,并为大家详细介绍修复过程。

image.png

最后由 Dubbo 层的优雅停机,引出我们的主角--Netty 优雅停机的设计与实现:

Reactor优雅关闭总流程.png

下面我们来正式开始本文的内容~~

本文概要.png

1. Java 进程的优雅启停

在我们的日常开发工作中,业务需求的迭代和优化伴随围绕着我们整个开发周期,当我们加班加点完成了业务需求的开发,然后又历经各种艰难险阻通过了测试的验证,最后经过和产品经理的各种纠缠相爱相杀之后,终于到了最最激动人心的时刻程序要部署上线了。

             

上线时的情绪波动.png

那么在程序部署上线的过程中势必会涉及到线上服务的关闭和重启,关于对线上服务的启停这里面有很多的讲究,万万不能简单粗暴的进行关闭和重启,因为此时线上服务可能承载着生产的流量,可能正在进行重要的业务处理流程。

比如:用户正在购买商品,钱已经付了,恰好这时赶上程序上线,如果我们这时简单粗暴的对服务进行关闭,重启,可能就会导致用户付了钱,但是订单未创建或者商品未出现在用户的购物清单中,给用户造成了实质的损失,这是非常严重的后果。

为了保证能在程序上线的过程中做到业务无损,所以线上服务的优雅关闭优雅启动显得就非常非常重要了。

                               

保持优雅很重要.png

1.1 优雅启动

在 Java 程序的运行过程中,程序的运行速度一般会随着程序的运行慢慢的提高,所以从线上表现上来看 Java 程序在运行一段时间后往往会比程序刚启动的时候会快很多。

这是因为 Java 程序在运行过程中,JVM 会不断收集到程序运行时的动态数据,这样可以将高频执行代码通过即时编译成机器码,随后程序运行就直接执行机器码,运行速度完全不输 C 或者 C++ 程序。

同时在程序执行过程中,用到的类会被加载到 JVM 中缓存,这样当程序再次使用到的时候不会触发临时加载,影响程序执行性能。

我们可以将以上几点当做 JVM 带给我们的性能红利,而当应用程序重新启动之后,这些性能红利也就消失了,如果我们让新启动的程序继续承担之前的流量规模,那么就会导致程序在刚启动的时候在没有这些性能红利的加持下直接进入高负荷的运转状态,这就可能导致线上请求大面积超时,对业务造成影响。

所以说优雅地启动一个程序是非常重要的,优雅启动的核心思想就是让程序在刚启动的时候不要承担太大的流量,让程序在低负荷的状态下运行一段时间,使其提升到最佳的运行状态时,在逐步的让程序承担更大的流量处理。

下面我们就来看下常用于优雅启动场景的两个技术方案:

1.1.1 启动预热

启动预热就是让刚刚上线的应用程序不要一下就承担之前的全部流量,而是在一个时间窗口内慢慢的将流量打到刚上线的应用程序上,目的是让 JVM 先缓慢的收集程序运行时的一些动态数据,将高频代码即时编译为机器码。

这个技术方案在众多 RPC 框架的实现中我们都可以看到,服务调用方会从注册中心拿到所有服务提供方的地址,然后从这些地址中通过特定的负载均衡算法从中选取一个服务提供方的发送请求。

为了能够使刚刚上线的服务提供方有时间去预热,所以我们就要从源头上控**务调用方发送的流量,服务调用方在发起 RPC 调用时应该尽量少的去负载均衡到刚刚启动的服务提供方实例。

那么服务调用方如何才能判断哪些是刚刚启动的服务提供方实例呢?

服务提供方在启动成功后会向注册中心注册自己的服务信息,我们可以将服务提供方的真实启动时间包含在服务信息中一起向注册中心注册,这样注册中心就会通知服务调用方有新的服务提供方实例上线并告知其启动时间。

服务调用方可以根据这个启动时间,慢慢的将负载权重增加到这个刚启动的服务提供方实例上。这样就可以解决服务提供方冷启动的问题,调用方通过在一个时间窗口内将请求慢慢的打到提供方实例上,这样就可以让刚刚启动的提供方实例有时间去预热,达到平滑上线的效果。

1.1.2 延迟暴露

启动预热更多的是从服务调用方的角度通过降低刚刚启动的服务提供方实例的负载均衡权重来实现优雅启动。

而延迟暴露则是从服务提供方的角度,延迟暴露服务时间,利用延迟的这段时间,服务提供方可以预先加载依赖的一些资源,比如:缓存数据,spring 容器中的 bean 。等到这些资源全部加载完毕就位之后,我们在将服务提供方实例暴露出去。这样可以有效降低启动前期请求处理出错的概率。

比如我们可以在 dubbo 应用中可以配置服务的延迟暴露时间:

//延迟5秒暴露服务
<dubbo:service delay="5000" /> 

1.2 优雅关闭

优雅关闭需要考虑的问题和处理的场景要比优雅启动要复杂的多,因为一个正常在线上运行的服务程序正在承担着生产的流量,同时也正在进行业务流程的处理。

要对这样的一个服务程序进行优雅关闭保证业务无损还是非常有挑战的,一个好的关闭流程,可以确保我们业务实现平滑的上下线,避免上线之后增加很多不必要的额外运维工作。

下面我们就来讨论下具体应该从哪几个角度着手考虑实现优雅关闭:

1.2.1 切走流量

                                        

image.png

第一步肯定是要将程序承担的现有流量全部切走,告诉服务调用方,我要进行关闭了,请不要在给我发送请求。那么如果进行切流呢??

在 RPC 的场景中,服务调用方通过服务发现的方式从注册中心中动态感知服务提供者的上下线变化。在服务提供方关闭之前,首先就要自己从注册中心中取消注册,随后注册中心会通知服务调用方,有服务提供者实例下线,请将其从本地缓存列表中剔除。这样就可以使得服务调用方之后的 RPC 调用不在请求到下线的服务提供方实例上。

但是这里会有一个问题,就是通常我们的注册中心都是 AP 类型的,它只会保证最终一致性,并不会保证实时一致性,基于这个原因,服务调用方感知到服务提供者下线的事件可能是延后的,那么在这个延迟时间内,服务调用方极有可能会向正在下线的服务发起 RPC 请求。

因为服务提供方已经开始进入关闭流程,那么很多对象在这时可能已经被销毁了,这时如果在收到请求过来,肯定是无法处理的,甚至可能还会抛出一个莫名其妙的异常出来,对业务造成一定的影响。

那么既然这个问题是由于注册中心可能存在的延迟通知引起的,那么我们就很自然的想到了让准备下线的服务提供方主动去通知它的服务调用方。

这种服务提供方主动通知在加上注册中心被动通知的两个方案结合在一起应该就能确保万无一失了吧。

事实上,在大部分场景下这个方案是可行的,但是还有一种极端的情况需要应对,就是当服务提供方通知调用方自己下线的网络请求在到达服务调用方之前的很极限的一个时间内,服务调用者向正在下线的服务提供方发起了 RPC 请求,这种极端的情况,就需要服务提供方和调用方一起配合来应对了。

首先服务提供方在准备关闭的时候,就把自己设置为正在关闭状态,在这个状态下不会接受任何请求,如果这时遇到了上边这种极端情况下的请求,那么就抛出一个 CloseException (这个异常是提供方和调用方提前约定好的),调用方收到这个 CloseException ,则将该服务提供方的节点剔除,并从剩余节点中通过负载均衡选取一个节点进行重试,通过让这个请求快速失败从而保证业务无损。

这三种方案结合在一起,笔者认为就是一个比较完美的切流方案了。

1.2.2 尽量保证业务无损

当把流量全部切走后,可能此时将要关闭的服务程序中还有正在处理的部分业务请求,那么我们就必须得等到这些业务处理请求全部处理完毕,并将业务结果响应给客户端后,在对服务进行关闭。

当然为了保证关闭流程的可控,我们需要引入关闭超时时间限制,当剩下的业务请求处理超时,那么就强制关闭。

为了保证关闭流程的可控,我们只能做到尽可能的保证业务无损而不是百分之百保证。所以在程序上线之后,我们应该对业务异常数据进行监控并及时修复。


通过以上介绍的优雅关闭方案我们知道,当我们将要优雅关闭一个应用程序时,我们需要做好以下两项工作:

  1. 我们首先要做的就是将当前将要关闭的应用程序上承载的生产流量全部切走,保证不会有新的流量打到将要关闭的应用程序实例上。

  2. 当所有的生产流量切走之后,我们还需要保证当前将要关闭的应用程序实例正在处理的业务请求要使其处理完毕,并将业务处理结果响应给客户端。以保证业务无损。当然为了使关闭流程变得可控,我们需要引入关闭超时时间。

以上两项工作就是我们在应用程序将要被关闭时需要做的,那么问题是我们如何才能知道应用程序要被关闭呢?换句话说,我们在应用程序里怎么才能感知到程序进程的关闭事件从而触发上述两项优雅关闭的操作执行呢?

既然我们有这样的需求,那么操作系统内核肯定会给我们提供这样的机制,事实上我们可以通过捕获操作系统给进程发送的信号来获取关闭进程通知,并在相应信号回调中触发优雅关闭的操作。

接下来让我们来看一下操作系统内核提供的信号机制:

2. 内核信号机制

信号是操作系统内核为我们提供用于在进程间通信的机制,内核可以利用信号来通知进程,当前系统所发生的的事件(包括关闭进程事件)。

信号在内核中并没有用特别复杂的数据结构来表示,只是用一个代号一样的数字来标识不同的信号。Linux 提供了几十种信号,分别代表不同的意义。信号之间依靠它们的值来区分

信号可以在任何时候发送给进程,进程需要为这个信号配置信号处理函数。当某个信号发生的时候,就默认执行对应的信号处理函数就可以了。这就相当于一个操作系统的应急手册,事先定义好遇到什么情况,做什么事情,提前准备好,出了事情照着做就可以了。

内核发出的信号就代表当前系统遇到了某种情况,我们需要应对的步骤就封装在对应信号的回调函数中。

信号机制引入的目的就在于:

  • 让应用进程知道当前已经发生了某个特定的事件(比如进程的关闭事件)。

  • 强制进程执行我们事先设定好的信号处理函数(比如封装优雅关闭逻辑)。

通常来说程序一旦启动就会一直运行下去,除非遇到 OOM 或者我们需要重新发布程序时会在运维脚本中调用 kill 命令关闭程序。Kill 命令从字面意思上来说是杀死进程,但是其本质是向进程发送信号,从而关闭进程。

下面我们使用 kill -l 命令查看下 kill 命令可以向进程发送哪些信号:

# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR
31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3
38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7
58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
63) SIGRTMAX-1 64) SIGRTMAX

笔者这里提取几个常见的信号来简要说明下:

  • SIGINT:信号代号为 2 。比如我们在终端以非后台模式运行一个进程实例时,要想关闭它,我们可以通过 Ctrl+C 来关闭这个前台程序。这个 Ctrl+C 向进程发送的正是 SIGINT 信号。

  • SIGQUIT:信号代号为 3 。比如我们使用 Ctrl+\ 来关闭一个前台进程,此时会向进程发送 SIGQUIT 信号,与 SIGINT 信号不同的是,通过 SIGQUIT 信号终止的进程会在退出时,通过 Core Dump 将当前进程的运行状态保存在 core dump 文件里面,方便后续查看。

  • SIGKILL:信号代号为 9 。通过 kill -9 pid 命令结束进程是非常非常危险的动作,我们应该坚决制止这种关闭进程的行为,因为 SIGKILL 信号是不能被进程捕获和忽略的,只能执行内核定义的默认操作直接关闭进程。而我们的优雅关闭操作是需要通过捕获操作系统信号,从而可以在对应的信号处理函数中执行优雅关闭的动作。由于 SIGKILL 信号不能被捕获,所以优雅关闭也就无法实现。现在大家就赶快检查下自己公司生产环境的运维脚本是否是通过 kill -9 pid 命令来结束进程的,一定要避免用这种方式,因为这种方式是极其无情并且略带残忍的关闭进程行为。

         

image.png
  • SIGSTOP :信号代号为 19 。该信号和 SIGKILL 信号一样都是无法被应用程序忽略和捕获的。向进程发送 SIGSTOP 信号也是无法实现优雅关闭的。通过 Ctrl+Z 来关闭一个前台进程,发送的信号就是 SIGSTOP 信号。

  • SIGTERM:信号代号为 15 。我们通常会使用 kill 命令来关闭一个后台运行的进程,kill 命令发送的默认信号就是 SIGTERM ,该信号也是本文要讨论的优雅关闭的基础,我们通常会使用 kill pid 或者 kill -15 pid 来向后台进程发送 SIGTERM 信号用以实现进程的优雅关闭。大家如果发现自己公司生产环境的运维脚本中使用的是 kill -9 pid 命令来结束进程,那么就要马上换成 kill pid 命令。

以上列举的都是我们常用的一些信号,大家也可以通过 man 7 signal 命令查看每种信号对应的含义:

Signal     Value     Action   Comment
──────────────────────────────────────────────────────────────────────
SIGHUP 1 Term Hangup detected on controlling terminal
or death of controlling process
SIGINT 2 Term Interrupt from keyboard
SIGQUIT 3 Core Quit from keyboard
SIGILL 4 Core Illegal Instruction


SIGABRT 6 Core Abort signal from abort(3)
SIGFPE 8 Core Floating point exception
SIGKILL 9 Term Kill signal
SIGSEGV 11 Core Invalid memory reference
SIGPIPE 13 Term Broken pipe: write to pipe with no
readers
SIGALRM 14 Term Timer signal from alarm(2)
SIGTERM 15 Term Termination signal
SIGUSR1 30,10,16 Term User-defined signal 1
SIGUSR2 31,12,17 Term User-defined signal 2
……

而应用进程对于信号的处理一般分为以下三种方式:

  • 内核定义的默认操作: 系统内核对每种信号都规定了默认操作,比如上面列表 Action 列中的 Term ,就是终止进程的意思。前边介绍的 SIGINT 信号和 SIGTERM 信号的默认操作就是 Term 。Core 的意思是 Core Dump ,即终止进程后会通过 Core Dump 将当前进程的运行状态保存在文件里面,方便我们事后进行分析问题在哪里。前边介绍的 SIGQUIT 信号默认操作就是 Core 。

  • 捕获信号:应用程序可以利用内核提供的系统调用来捕获信号,并将优雅关闭的步骤封装在对应信号的处理函数中。当向进程发送关闭信号 SIGTERM  的时候,在进程内我们可以通过捕获 SIGTERM 信号,随即就会执行我们自定义的信号处理函数。我们从而可以在信号处理函数中执行进程优雅关闭的逻辑。

  • 忽略信号:当我们不希望处理某些信号的时候,就可以忽略该信号,不做任何处理,但是前边介绍的 SIGKILL 信号和 SIGSTOP 是无法被捕获和忽略的,内核会直接执行这两个信号定义的默认操作直接关闭进程。

当我们不希望信号执行内核定义的默认操作时,我们就需要在进程内捕获信号,并注册信号的回调函数来执行我们自定义的信号处理逻辑。

比如我们在本文中要讨论的优雅关闭场景,当进程接收到 SIGTERM 信号时,为了实现进程的优雅关闭,我们并不希望进程执行 SIGTERM 信号的默认操作直接关闭进程,所以我们要在进程中捕获 SIGTERM 信号,并将优雅关闭的操作步骤封装在对应的信号处理函数中。

2.1 如何捕获信号

在介绍完了内核信号的分类以及进程对于信号处理的三种方式之后,下面我们来看下如何来捕获内核信号,并在对应信号回调函数中自定义我们的处理逻辑。

内核提供了 sigaction 系统调用,来供我们捕获信号以及与相应的信号处理函数绑定起来。

int sigaction(int signum, const struct sigaction *act,
                     struct sigaction *oldact)
;
  • int signum:表示我们想要在进程中捕获的信号,比如本文中我们要实现优雅关闭就需要在进程中捕获 SIGTERM 信号,对应的 signum = 15 。

  • struct sigaction *act:内核中会用一个 sigaction 结构体来封装我们自定义的信号处理逻辑。

  • struct sigaction *oldact:这里是为了兼容老的信号处理函数,了解一下就可以了,和本文主线无关。

sigaction 结构体用来封装信号对应的处理函数,以及更加精细化控制信号处理的信息。

struct sigaction {
  __sighandler_t sa_handler;
  unsigned long sa_flags;
        .......
  sigset_t sa_mask; 
};
  • __sighandler_t sa_handler:其实本质上是一个函数指针,用来保存我们为信号注册的信号处理函数,优雅关闭的逻辑就封装在这里

  • long sa_flags:为了更加精细化的控制信号处理逻辑,这个字段保存了一些控制信号处理行为的选项集合。常见的选项有:

    • SA_ONESHOT:意思是我们注册的信号处理函数,仅仅只起一次作用。响应完一次后,就设置回默认行为。

    • SA_NOMASK:表示信号处理函数在执行的过程中会被中断。比如我们进程捕获到一个感兴趣的信号,随后会执行注册的信号处理函数,但是此时进程又收到其他的信号或者和上次相同的信号,此时正在执行的信号处理函数会被中断,从而转去执行最新到来的信号处理函数。如果连续产生多个相同的信号,那么我们的信号处理函数就要做好同步,幂等等措施

    • SA_INTERRUPT:当进程正在执行一个非常耗时的系统调用时,如果此时进程接收到了信号,那么这个系统调用将会被信号中断,进程转去执行相应的信号处理函数。那么当信号处理函数执行完时,如果这里设置了 SA_INTERRUPT ,那么系统调用将不会继续执行并且会返回一个 -EINTR 常量,告诉调用方,这个系统调用被信号中断了,怎么处理你看着办吧。

    • SA_RESTART:当系统调用被信号中断后,相应的信号处理函数执行完毕后,如果这里设置了 SA_RESTART 系统调用将会被自动重新启动。

  • sigset_t sa_mask:这个字段主要指定在信号处理函数正在运行的过程中,如果连续产生多个信号,需要屏蔽哪些信号。也就是说当进程收到屏蔽的信号时,正在进行的信号处理函数不会被中断。

屏蔽并不意味着信号一定丢失,而是暂存,这样可以使相同信号的处理函数,在进程连续接收到多个相同的信号时,可以一个一个的处理。

最终通过 sigaction 函数会调用到底层的系统调用 rt_sigaction 函数,在 rt_sigaction 中会将上边介绍的用户态 struct sigaction 结构拷贝为内核态的 k_sigaction ,然后调用 do_sigaction 函数。

最后在 do_sigaction 函数中将用户要在进程中捕获的信号以及相应的信号处理函数设置到进程描述符 task_struct 结构里。

进程中的信号结构.png

进程在内核中的数据结构 task_struct 中有一个 struct sighand_struct 结构的属性 sighand ,struct sighand_struct 结构中包含一个 k_sigaction 类型的数组 action[] ,这个数组保存的就是进程中需要捕获的信号以及对应的信号处理函数在内核中的结构体 k_sigaction ,数组下标为进程需要捕获的信号。

#include <signal.h>

static void sig_handler(int signum) {

    if (signum == SIGTERM) {

        .....执行优雅关闭逻辑....

    }

}

int main (Void) {

    struct sigaction sa_usr; //定义sigaction结构体
    sa_usr.sa_flags = 0;
    sa_usr.sa_handler = sig_handler;   //设置信号处理函数

    sigaction(SIGTERM, &sa_usr, NULL);//进程捕获信号,注册信号处理函数
        
        ,,,,,,,,,,,,
}

我们可以通过如上简单的示例代码,将 SIGTERM 信号及其对应的自定义信号处理函数注册到进程中,当我们执行 kill -15 pid 命令之后,进程就会捕获到 SIGTERM 信号,随后就可以执行优雅关闭步骤了。

3. JVM 中的 ShutdownHook

在《2. 内核信号机制》小节中为大家介绍的内容是操作系统内核为我们实现进程的优雅关闭提供的最底层系统级别的支持机制,在内核的强力支持下,那么本文的主题 Java 进程的优雅关闭就很容易实现了。

我们要想实现 Java 进程的优雅关闭功能,只需要在进程启动的时候将优雅关闭的操作封装在一个 Thread 中,随后将这个 Thread 注册到 JVM 的 ShutdownHook 中就好了,当 JVM 进程接收到 kill -15 信号时,就会执行我们注册的 ShutdownHook 关闭钩子,进而执行我们定义的优雅关闭步骤。

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                .....执行优雅关闭步骤.....
            }
        });

3.1 导致 JVM 退出的几种情况

  1. JVM 进程中最后一个非守护线程退出。

  2. 在程序代码中主动调用 java.lang.System#exit(int status) 方法,会导致 JVM 进程的退出并触发 ShutdownHook 的调用。参数 int status 如果是非零值,则表示本次关闭是在一个非正常情况下的关闭行为。比如:进程发生 OOM 异常或者其他运行时异常。

public static void main(String[] args) {
        try {

           ......进程启动main函数.......

        } catch (RuntimeException e) {
            logger.error(e.getMessage(), e);
            // JVM 进程主动关闭触发调用 shutdownHook
            System.exit(1);
        }
}
  1. 当 JVM 进程接收到第二小节《2.内核信号机制》介绍的那些关闭信号时, JVM 进程会被关闭。由于 SIGKILL 信号和 SIGSTOP 信号不能够被进程捕获和忽略,这两个信号会直接粗暴地关闭 JVM 进程,所以一般我们会发送 SIGTERM 信号,JVM 进程通过捕获 SIGTERM 信号,从而可以执行我们定义的 ShutdownHook 完成优雅关闭的操作。

  2. Native Method 执行过程中发生错误,比如试图访问一个不存在的内存,这样也会导致 JVM 强制关闭,ShutdownHook 也不会运行。

3.2 使用 ShutdownHook 的注意事项

  1. ShutdownHook 其实本质上是一个已经被初始化但是未启动的 Thread ,这些通过 Runtime.getRuntime().addShutdownHook 方法注册的 ShutdownHooks ,在 JVM 进程关闭的时候会被启动 并发执行,但是并 不会保证执行顺序

所以在编写 ShutdownHook 中的逻辑时,我们应该确保程序的线程安全性,并尽可能避免死锁。最好是一个 JVM 进程只注册一个 ShutdownHook 。

  1. 如果我们通过 java.lang.Runtime#runFinalizersOnExit(boolean value) 开启了 finalization-on-exit ,那么当所有 ShutdownHook 运行完毕之后,JVM 在关闭之前将会继续调用所有未被调用的 finalizers 方法。默认 finalization-on-exit 选项是关闭的。

注意:当 JVM 开始关闭并执行上述关闭操作的时候,守护线程是会继续运行的,如果用户使用 java.lang.System#exit(int status) 方法主动发起 JVM 关闭,那么关闭期间非守护线程也是会继续运行的。

  1. 一旦 JVM 进程开始关闭,一般情况下这个过程是不可以被中断的,除非操作系统强制中断或者用户通过调用 java.lang.Runtime#halt(int status) 来强制关闭。
   public void halt(int status) {
        SecurityManager ** = System.getSecurityManager();
        if (** != null) {
            **.checkExit(status);
        }
        Shutdown.halt(status);
    }

java.lang.Runtime#halt(int status) 方法是用来强制关闭正在运行的 JVM 进程的,它会导致我们注册的 ShutdownHook 不会被运行和执行,如果此时 JVM 正在执行 ShutdownHook ,当调用该方法后,JVM 进程将会被强制关闭,并不会等待 ShutdownHook 执行完毕。

  1. 当 JVM 关闭流程开始的时候,就不能在向其注册 ShutdownHook 或者取消注册之前已经注册好的 ShutdownHook 了,否则将会抛出 IllegalStateException异常。

  2. ShutdownHook 中的程序应该尽快的完成优雅关闭逻辑,因为当用户调用 System#exit 方法的时候是希望 JVM 在保证业务无损的情况下尽快完成关闭动作。这里并不适合做一些需要长时间运行的任务或者和用户交互的操作。

如果是因为物理机关闭从而导致的 JVM 关闭,那么操作系统只会允许 JVM 限定的时间内尽快的关闭,超过限定时间操作系统将会强制关闭 JVM 。

  1. ShutdownHook 中可能也会抛出异常,而 ShutdownHook 对于 JVM 来说本质上是一个 Thread ,那么对于 ShutdownHook 中未捕获的异常,JVM 的处理方法和其他普通的线程一样,都是通过调用 ThreadGroup#uncaughtException 方法来处理。此方法的默认实现是将异常的堆栈跟踪打印到 System#err 并终止异常的 ShutdownHook 线程。

注意:这里只会停止异常的 ShutdownHook ,但不会影响其他 ShutdownHook 线程的执行更不会导致 JVM 退出。

  1. 最后也是非常重要的一点是,当 JVM 进程接收到 SIGKILL 信号和 SIGSTOP 信号时,是会强制关闭,并不会执行 ShutdownHook 。另外一种导致 JVM 强制关闭的情况就是 Native Method 执行过程中发生错误,比如试图访问一个不存在的内存,这样也会导致 JVM 强制关闭,ShutdownHook 也不会运行。

3.3 ShutdownHook 执行原理

我们在 JVM 中通过 Runtime.getRuntime().addShutdownHook 添加关闭钩子,当 JVM 接收到 SIGTERM 信号之后,就会调用我们注册的这些 ShutdownHooks 。

本小节介绍的 ShutdownHook 就类似于我们在第二小节《内核信号机制》中介绍的信号处理函数。

大家这里一定会有个疑问,那就是在介绍内核信号机制小节中,我们可以通过系统调用 sigaction 函数向内核注册进程要捕获的信号以及对应的信号处理函数。

int sigaction(int signum, const struct sigaction *act,
                     struct sigaction *oldact)
;

但是在本小节介绍的 JVM 中,我们只是通过 Runtime.getRuntime().addShutdownHook 注册了一个关闭钩子。但是并未注册 JVM 进程所需要捕获的信号。那么 JVM 是怎么捕获关闭信号的呢?

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                .....执行优雅关闭步骤.....
            }
        });

事实上,JVM 捕获操作系统信号的部分在 JDK 中已经帮我们处理好了,在用户层我们并不需要关注捕获信号的处理,只需要关注信号的处理逻辑即可。

下面我们就来看一下 JDK 是如何帮助我们将要捕获的信号向内核注册的?

当 JVM 第一个线程被初始化之后,随后就会调用 System#initializeSystemClass 函数来初始化 JDK 中的一些系统类,其中就包括注册 JVM 进程需要捕获的信号以及信号处理函数。

public final class System {

    private static void initializeSystemClass() {

           .......省略.......

            // Setup Java signal handlers for HUP, TERM, and INT (where available).
           Terminator.setup();

           .......省略.......

    }

}

从这里可以看出,JDK 在向 JVM 注册需要捕获的内核信号是在 Terminator 类中进行的。


class Terminator {
    //信号处理函数
    private static SignalHandler handler = null;

    static void setup() {
        if (handler != nullreturn;
        SignalHandler sh = new SignalHandler() {
            public void handle(Signal sig) {
                Shutdown.exit(sig.getNumber() + 0200);
            }
        };
        handler = sh;

        try {
            Signal.handle(new Signal("HUP"), sh);
        } catch (IllegalArgumentException e) {
        }
        try {
            Signal.handle(new Signal("INT"), sh);
        } catch (IllegalArgumentException e) {
        }
        try {
            Signal.handle(new Signal("TERM"), sh);
        } catch (IllegalArgumentException e) {
        }
    }

}

JDK 向我们提供了 sun.misc.Signal#handle(Signal signal, SignalHandler signalHandler) 函数来实现在 JVM 进程中对内核信号的捕获。底层依赖于我们在第二小节介绍的系统调用 sigaction 。

int sigaction(int signum, const struct sigaction *act,
                     struct sigaction *oldact)
;

sun.misc.Signal#handle 函数的参数含义和系统调用函数 sigaction 中的参数含义是一一对应的:

  • Signal signal:表示要捕获的内核信号。从这里我们可以看出 JVM 主要捕获三种信号:SIGHUP(1),SIGINT(2),SIGTERM(15)。

除了上述的这三种信号之外,JVM 如果接收到其他信号,会执行系统内核默认的操作,直接关闭进程,并不会触发 ShutdownHook 的执行。

  • SignalHandler handler:信号响应函数。我们看到这里直接调用了 Shutdown#exit 函数。
    SignalHandler sh = new SignalHandler() {
            public void handle(Signal sig) {
                Shutdown.exit(sig.getNumber() + 0200);
            }
        };

我们这里应该很容易就会猜测出 ShutdownHook 的调用应该就是在 Shutdown#exit 函数中被触发的。

class Shutdown {

    static void exit(int status) {

          ........省略.........

          synchronized (Shutdown.class{
              // 开始 JVM 关闭流程,执行 ShutdownHooks
              sequence();
              // 强制关闭 JVM
              halt(status);
          }

    }

    private static void sequence() {
        synchronized (lock) {
            if (state != HOOKS) return;
        }
        //触发 ShutdownHooks
        runHooks();
        boolean rfoe;
        synchronized (lock) {
            state = FINALIZERS;
            rfoe = runFinalizersOnExit;
        }
        //如果 runFinalizersOnExit = true
        //开始运行所有未被调用过的 Finalizers
        if (rfoe) runAllFinalizers();
    }
}

Shutdown#sequence 函数中的逻辑就是我们在《3.2 使用ShutdownHook的注意事项》小节中介绍的 JVM 关闭时的运行逻辑:在这里会触发所有 ShutdownHook 的并发运行。注意这里并不会保证运行顺序。

当所有 ShutdownHook 运行完毕之后,如果我们通过 java.lang.Runtime#runFinalizersOnExit(boolean value) 开启了 finalization-on-exit 选项,JVM 在关闭之前将会继续调用所有未被调用的 finalizers 方法。默认 finalization-on-exit 选项是关闭的。

3.4 ShutdownHook 的执行

shutdownhook的运行.png

如上图所示,在 JDK 的 Shutdown 类中,包含了一个 Runnable[] hooks 数组,容量为 10 。JDK 中的 ShutdownHook 是以类型来分类的,数组 hooks 每一个槽中存放的是一种特定类型的 ShutdownHook 。

而我们通常在程序代码中通过 Runtime.getRuntime().addShutdownHook 注册的是 Application hooks 类型的 ShutdownHook ,存放在数组 hooks 中索引为 1 的槽中。

当在 Shutdown#sequence 中触发 runHooks() 函数开始运行 JVM 中所有类型的 ShutdownHooks 时,会在 runHooks() 函数中依次遍历数组 hooks 中的 Runnable ,进而开始运行 Runnable 中封装的 ShutdownHooks 。

当遍历到数组 Hooks 的第二个槽(索引为 1 )的时候,Application hooks 类型的 ShutdownHook 得以运行,也就是我们通过 Runtime.getRuntime().addShutdownHook 注册的 ShutdownHook 在这个时候开始运行起来。


    // The system shutdown hooks are registered with a predefined slot.
    // The list of shutdown hooks is as follows:
    // (0) Console restore hook
    // (1) Application hooks
    // (2) DeleteOnExit hook
    private static final int MAX_SYSTEM_HOOKS = 10;
    private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

    /* Run all registered shutdown hooks
     */

    private static void runHooks() {
        for (int i=0; i < MAX_SYSTEM_HOOKS; i++) {
            try {
                Runnable hook;
                synchronized (lock) {
                    // acquire the lock to make sure the hook registered during
                    // shutdown is visible here.
                    currentRunningHook = i;
                    hook = hooks[i];
                }
                if (hook != null) hook.run();
            } catch(Throwable t) {
                if (t instanceof ThreadDeath) {
                    ThreadDeath td = (ThreadDeath)t;
                    throw td;
                }
            }
        }
    }

下面我们就来看一下,JDK 是如果通过 Runtime.getRuntime().addShutdownHook 函数将我们自定义的 ShutdownHook 注册到 Shutdown 类中的数组 Hooks 里的。

3.5 ShutdownHook 的注册

public class Runtime {

    public void addShutdownHook(Thread hook) {
        SecurityManager ** = System.getSecurityManager();
        if (** != null) {
            **.checkPermission(new RuntimePermission("shutdownHooks"));
        }
        //注意 这里注册的是 Application 类型的 hooks
        ApplicationShutdownHooks.add(hook);
    }

}

从 JDK 源码中我们看到在 Runtime 类中的 addShutdownHook 方法里,JDK 会将我们自定义的 ShutdownHook 封装在 ApplicationShutdownHooks 类中,从这类的命名上看,它里边封装的就是我们在上小节《3.4 ShutdownHook 的执行》提到的 Application hooks 类型的 ShutdownHook ,由用户自定义实现。

class ApplicationShutdownHooks {
    // 存放用户自定义的 Application 类型的 hooks
    private static IdentityHashMap<Thread, Thread> hooks;

    static synchronized void add(Thread hook) {
        if(hooks == null)
            throw new IllegalStateException("Shutdown in progress");

        if (hook.isAlive())
            throw new IllegalArgumentException("Hook already running");

        if (hooks.containsKey(hook))
            throw new IllegalArgumentException("Hook previously registered");

        hooks.put(hook, hook);
    }

    static void runHooks() {
        Collection<Thread> threads;
        synchronized(ApplicationShutdownHooks.class{
            threads = hooks.keySet();
            hooks = null;
        }
        // 顺序启动 shutdownhooks
        for (Thread hook : threads) {
            hook.start();
        }
        // 并发调用 shutdownhooks ,等待所有 hooks 运行完毕退出
        for (Thread hook : threads) {
            try {
                hook.join();
            } catch (InterruptedException x) { }
        }
    }
}

ApplicationShutdownHooks 类中也有一个集合 IdentityHashMap<Thread, Thread> hooks ,专门用来存放由用户自定义的 Application hooks 类型的 ShutdownHook 。通过 ApplicationShutdownHooks#add 方法添加进 hooks 集合中。

然后在 runHooks 方法里挨个启动 ShutdownHook 线程,并发执行。注意这里的 runHooks 方法是 ApplicationShutdownHooks 类中的

在 ApplicationShutdownHooks 类的静态代码块 static{.....} 中会将 runHooks 方法封装成 Runnable 添加进 Shutdown 类中的 hooks 数组中。注意这里 Shutdown#add 方法传递进的索引是 1 。

class ApplicationShutdownHooks {
    /* The set of registered hooks */
    private static IdentityHashMap<Thread, Thread> hooks;

    static {
        try {
            Shutdown.add(1 /* shutdown hook invocation order */,
                false /* not registered if shutdown in progress */,
                new Runnable() {
                    public void run() {
                        runHooks();
                    }
                }
            );
            hooks = new IdentityHashMap<>();
        } catch (IllegalStateException e) {
            // application shutdown hooks cannot be added if
            // shutdown is in progress.
            hooks = null;
        }
    }
}

Shutdownhook的执行.png

Shutdown#add 方法的逻辑就很简单了:

class Shutdown {

    private static final int MAX_SYSTEM_HOOKS = 10;
    private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

    static void add(int slot, boolean registerShutdownInProgress, Runnable hook) {
        synchronized (lock) {
            if (hooks[slot] != null)
                throw new InternalError("Shutdown hook at slot " + slot + " already registered");

            if (!registerShutdownInProgress) {
                if (state > RUNNING)
                    throw new IllegalStateException("Shutdown in progress");
            } else {
                if (state > HOOKS || (state == HOOKS && slot <= currentRunningHook))
                    throw new IllegalStateException("Shutdown in progress");
            }

            hooks[slot] = hook;
        }
    }
}
  • 参数 Runnable hook 就是在 ApplicationShutdownHooks 中的静态代码块 static{....} 中将 runHooks 方法封装成的 Runnable。

  • 参数 int slot 表示将封装好的 Runnable 放入 hooks 数组中的哪个槽中。这里我们注册的是 Application hooks 类型的 ShutdonwHook ,所以这里的索引为 1 。

  • 参数 registerShutdownInProgress 表示是否允许在 JVM 关闭流程开始之后,继续向 JVM 添加 ShutdownHook 。默认为 false 表示不允许。否则将会抛出 IllegalStateException 异常。这一点笔者在小节《3.2 使用ShutdownHook的注意事项》中强调过。

以上就是 JVM 如何捕获操作系统内核信号,如何注册 ShutdownHook ,以及何时触发 ShutdownHook 的执行的一个全面介绍。

shutdownhook完整触发时机.png

读到这里大家应该彻底明白了为什么不能使用 kill -9 pid 命令来关闭进程了吧,现在赶快去检查一下你们公司生产环境的运维脚本吧!!


俗话说的好 talk is cheap! show me the code! ,在介绍了这么多关于优雅关闭的理论方案和原理之后,我想大家现在一定很好奇究竟我们该如何实现这一套优雅关闭的方案呢?

那么接下来笔者就从一些知名框架源码实现角度,为大家详细阐述一下优雅关闭是如何实现的?

                         

image.png

4. Spring 的优雅关闭机制

前面两个小节中我们从支持优雅关闭最底层的内核信号机制开始聊起然后到 JVM 进程实现优雅关闭的 ShutdwonHook 原理,经过这一系列的介绍,我们现在对优雅关闭在内核层和 JVM 层的相关机制原理有了一定的了解。

那么在真实 Java 应用中,我们到底该如何基于上述机制实现一套优雅关闭方案呢?本小节我们来从 Spring 源码中获取下答案!!

在介绍 Spring 优雅关闭机制源码实现之前,笔者先来带大家回顾下,在 Spring 的应用上下文关闭的时候,Spring 究竟给我们提供了哪些关闭时的回调机制,从而可以让我们在这些回调中编写 Java 应用的优雅关闭逻辑。

4.1 发布 ContextClosedEvent 事件

在 Spring 上下文开始关闭的时候,首先会发布 ContextClosedEvent 事件,注意此时 Spring 容器的 Bean 还没有开始销毁,所以我们可以在该事件回调中执行优雅关闭的操作。

@Component
public class ShutdownListener implements ApplicationListener<ContextClosedEvent{
       @Override
       public void onApplicationEvent(ContextClosedEvent event) {
                  ........优雅关闭逻辑.....
       }
}

4.2 Spring 容器中的 Bean 销毁前回调

当 Spring 开始销毁容器中管理的 Bean 之前,会回调所有实现 DestructionAwareBeanPostProcessor 接口的 Bean 中的 postProcessBeforeDestruction 方法。

@Component
public class DestroyBeanPostProcessor implements DestructionAwareBeanPostProcessor {

    @Override
    public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {

             ........Spring容器中的Bean开始销毁前回调.......
    }
}

4.3 回调标注 @PreDestroy 注解的方法

@Component
public class Shutdown {
    @PreDestroy
    public void preDestroy() {
        ......释放资源.......
    }
}

4.4 回调 DisposableBean 接口中的 destroy 方法

@Component
public class Shutdown implements DisposableBean{

    @Override
    public void destroy() throws Exception {
         ......释放资源......
    }

}

4.5 回调自定义的销毁方法

<bean id="Shutdown" class="com.test.netty.Shutdown"  destroy-method="doDestroy"/>
public class Shutdown {

    public void doDestroy() {
        .....自定义销毁方法....
    }
}

4.6 Spring 优雅关闭机制的实现

Spring 相关应用程序本质上也是一个 JVM 进程,所以 Spring 框架想要实现优雅关闭机制也必须依托于我们在本文第三小节中介绍的 JVM 的 ShutdownHook 机制。

在 Spring 启动的时候,需要向 JVM 注册 ShutdownHook ,当我们执行 kill - 15 pid 命令时,随后 Spring 会在 ShutdownHook 中触发上述介绍的五种回调。

下面我们来看一下 Spring 中 ShutdownHook 的注册逻辑:

4.6.1 Spring 中 ShutdownHook 的注册

public abstract class AbstractApplicationContext extends DefaultResourceLoader
  implements ConfigurableApplicationContextDisposableBean 
{

 @Override
 public void registerShutdownHook() {
  if (this.shutdownHook == null) {
   // No shutdown hook registered yet.
   this.shutdownHook = new Thread() {
    @Override
    public void run() {
     synchronized (startupShutdownMonitor) {
      doClose();
     }
    }
   };
   Runtime.getRuntime().addShutdownHook(this.shutdownHook);
  }
 }
}

在 Spring 启动的时候,我们需要调用 AbstractApplicationContext#registerShutdownHook 方法向 JVM 注册 Spring 的 ShutdownHook ,从这段源码中我们看出,Spring 将 doClose() 方法封装在 ShutdownHook 线程中,而 doClose() 方法里边就是 Spring 优雅关闭的逻辑。

这里需要强调的是,当我们在一个纯 Spring 环境下,Spring 框架是不会为我们主动调用 registerShutdownHook 方法去向 JVM 注册 ShutdownHook 的,我们需要手动调用 registerShutdownHook 方法去注册。

public class SpringShutdownHook {

    public static void main(String[] args) throws IOException {
        GenericApplicationContext context = new GenericApplicationContext();
                      ........
        // 注册 Shutdown Hook
        context.registerShutdownHook();
                      ........
    }
}

而在 SpringBoot 环境下,SpringBoot 在启动的时候会为我们调用这个方法去主动注册 ShutdownHook 。我们不需要手动注册。

public class SpringApplication {

 public ConfigurableApplicationContext run(String... args) {

                  ...............省略.................

                  ConfigurableApplicationContext context = null;
                  context = createApplicationContext();
                  refreshContext(context);

                  ...............省略.................
 }

 private void refreshContext(ConfigurableApplicationContext context) {
  refresh(context);
  if (this.registerShutdownHook) {
   try {
    context.registerShutdownHook();
   }
   catch (AccessControlException ex) {
    // Not allowed in some environments.
   }
  }
 }

}

4.6.2 Spring 中的优雅关闭逻辑

 protected void doClose() {
  // 更新上下文状态
  if (this.active.get() && this.closed.compareAndSet(falsetrue)) {
   if (logger.isInfoEnabled()) {
    logger.info("Closing " + this);
   }
            // 取消 JMX 托管
   LiveBeansView.unregisterApplicationContext(this);

   try {
    // 发布 ContextClosedEvent 事件
    publishEvent(new ContextClosedEvent(this));
   }
   catch (Throwable ex) {
    logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
   }

   // 回调 Lifecycle beans,相关 stop 方法
   if (this.lifecycleProcessor != null) {
    try {
     this.lifecycleProcessor.onClose();
    }
    catch (Throwable ex) {
     logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
    }
   }

   // 销毁 bean,触发前面介绍的几种回调
   destroyBeans();

   // Close the state of this context itself.
   closeBeanFactory();

   // Let subclasses do some final clean-up if they wish...
   onClose();

   // Switch to inactive.
   this.active.set(false);
  }
 }

在这里我们可以看出最终是在 AbstractApplicationContext#doClose 方法中触发本小节开始介绍的五种回调:

  1. 发布 ContextClosedEvent 事件。 注意这里是一个同步事件,也就是说 Spring 的 ShutdownHook 线程在这里发布完事件之后会继续同步执行事件的处理,等到事件处理完毕后,才会去执行后面的 destroyBeans() 方法对 IOC 容器中的 Bean 进行销毁。

所以在 ContextClosedEvent 事件监听类中,可以放心地去做优雅关闭相关的操作,因为此时 Spring 容器中的 Bean 还没有被销毁。

  1. destroyBeans() 方法中依次触发剩下的四种回调。

最后结合前边小节中介绍的内容,总结 Spring 的整个优雅关闭流程如下图所示:

Spring优雅关闭机制.png

5. Dubbo 的优雅关闭

本小节优雅关闭部分源码基于 apache dubbo 2.7.7 版本,该版本中的优雅关闭是有 Bug 的,下面让我们一起来 Shooting Bug !

在前边几个小节的内容中,我们从内核提供的底层技术支持开始聊到了 JVM 的 ShutdonwHook ,然后又从 JVM 聊到了 Spring 框架的优雅关闭机制。

在了解了这些内容之后,本小节我们就来看下 dubbo 中的优雅关闭实现,由于现在几乎所有 Java 应用都会采用 Spring 作为开发框架,所以 dubbo 一般是集成在 Spring 框架中供我们使用的,它的优雅关闭和 Spring 有着紧密的联系。

5.1 Dubbo 在 Spring 环境下的优雅关闭

在本文第四小节《4. Spring的优雅关闭机制》的介绍中,我们知道在 Spring 的优雅关闭流程中,Spring 的 ShutdownHook 线程会首先发布 ContextClosedEvent 事件,该事件是一个同步事件,ShutdownHook 线程发布完该事件紧接着就会同步执行该事件的监听器,当在事件监听器中处理完 ContextClosedEvent 事件之后,在回过头来执行 destroyBeans() 方法并依次触发剩下的四种回调来销毁 IOC 容器中的 Bean 。

Spring优雅关闭流程.png

由于在处理 ContextClosedEvent 事件的时候,Dubbo 所依赖的一些关键 bean 这时还没有被销毁,所以 dubbo 定义了一个 DubboBootstrapApplicationListener 用来监听 ContextClosedEvent 事件,并在 onContextClosedEvent 事件处理方法中调用 dubboBootstrap.stop() 方法开启 dubbo 的优雅关闭流程。

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered 
{

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        // 这里是 Spring 的同步事件,publishEvent 和处理 Event 是在同一个线程中
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        // spring 在 shutdownhook 中会先触发 ContextClosedEvent ,然后在销毁 spring beans
        // 所以这里 dubbo 开始优雅关闭时,依赖的 spring beans 并未销毁
        dubboBootstrap.stop();
    }

}

当服务提供者 ServiceBean 和服务消费者 ReferenceBean 被初始化时,会将 DubboBootstrapApplicationListener 注册到 Spring 容器中。并开始监听 ContextClosedEvent 事件和 ContextRefreshedEvent 事件。

public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessorEnvironmentAware,
        ResourceLoaderAwareBeanClassLoaderAware 
{

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

        // @since 2.7.5 注册spring启动 关闭事件的listener
        //在事件回调中中调用启动类 DubboBootStrap的start  stop来启动 关闭dubbo应用
        registerBeans(registry, DubboBootstrapApplicationListener.class);
      
                  ........省略.......
    }
}

5.2 Dubbo 优雅关闭流程简介

由于本文的主题是介绍优雅关闭的一整条流程主线,所以这里笔者只是简要介绍 Dubbo 优雅关闭的主流程,相关细节部分笔者会在后续的 dubbo 源码解析系列里为大家详细介绍 Dubbo 优雅关闭的细节。为了避免本文发散太多,我们这里还是聚焦于流程主线。

public class DubboBootstrap extends GenericEventListener {

    public DubboBootstrap stop() throws IllegalStateException {
        destroy();
        return this;
    }

}

这里的核心逻辑其实就是我们在《1.2 优雅关闭》小节中介绍的两大优雅关闭主题:

  • 从当前正在关闭的应用实例上切走现有生产流量。

  • 保证业务无损。

这里大家只需要了解 Dubbo 优雅关闭的主流程即可,相关细节笔者后续会有一篇专门的文章详细为大家介绍。

    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(truefalse)
                        && destroyed.compareAndSet(falsetrue)) {

                    //取消注册
                    unregisterServiceInstance();
                    //取消元数据服务
                    unexportMetadataService();
                    //停止暴露服务
                    unexportServices();
                    //取消订阅服务
                    unreferServices();
                    //注销注册中心
                    destroyRegistries();
                    //关闭服务
                    DubboShutdownHook.destroyProtocols();
                    //销毁注册中心客户端实例
                    destroyServiceDiscoveries();
                    //清除应用配置类以及相关应用模型
                    clear();
                    //关闭线程池
                    shutdown();
                    //释放资源
                    release();
                }
            } finally {
                destroyLock.unlock();
            }
        }
    }

从以上内容可以看出,Dubbo 的优雅关闭依托于 Spring ContextClosedEvent 事件的发布,而 ContextClosedEvent 事件的发布又依托于 Spring ShutdownHook 的注册。

dubbo spring环境优雅关闭.png

从《4.6.1 Spring 中 ShutdownHook 的注册》小节的介绍中我们知道,在 SpringBoot 环境下,SpringBoot 在启动的时候会为我们调用 ApplicationContext#registerShutdownHook 方法去主动注册 ShutdownHook 。我们不需要手动注册。

而在一个纯 Spring 环境下,Spring 框架并不会为我们主动调用 registerShutdownHook 方法去向 JVM 注册 ShutdownHook 的,我们需要手动调用 registerShutdownHook 方法去注册。

所以 Dubbo 这里为了兼容 SpringBoot 环境和纯 Spring 环境下的优雅关闭,引入了 SpringExtensionFactory类 ,只要在 Spring 环境下都会调用 registerShutdownHook 去向 JVM 注册 Spring 的 ShutdownHook 。

public class SpringExtensionFactory implements ExtensionFactory {
    private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);

    private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();

    public static void addApplicationContext(ApplicationContext context) {
        CONTEXTS.add(context);
        if (context instanceof ConfigurableApplicationContext) {
            //在spring启动成功之后设置shutdownHook(兼容非SpringBoot环境)
            ((ConfigurableApplicationContext) context).registerShutdownHook();
        }
    }

}

当服务提供者 ServiceBean 和服务消费者 ReferenceBean 在初始化完成之后,会回调 SpringExtensionFactory#addApplicationContext 方法注册 ShutdownHook 。

public class ServiceBean<Textends ServiceConfig<Timplements InitializingBeanDisposableBean,
        ApplicationContextAwareBeanNameAwareApplicationEventPublisherAware 
{

   @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        SpringExtensionFactory.addApplicationContext(applicationContext);
    }

}
public class ReferenceBean<Textends ReferenceConfig<Timplements FactoryBean,
        ApplicationContextAwareInitializingBeanDisposableBean 
{

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        SpringExtensionFactory.addApplicationContext(applicationContext);
    }

}

以上就是 Dubbo 在 Spring 集成环境下的优雅关闭全流程,下面我们来看下 Dubbo 在非 Spring 环境下的优雅关闭流程。

5.3 Dubbo 在非 Spring 环境下的优雅关闭

在上小节的介绍中我们知道 Dubbo 在 Spring 环境下依托 Spring 的 ShutdownHook ,通过监听 ContextClosedEvent 事件,从而触发 Dubbo 的优雅关闭流程。

而到了非 Spring 环境下,Dubbo 就需要定义自己的 ShutdownHook ,从而引入了 DubboShutdownHook ,直接将优雅关闭流程封装在自己的 ShutdownHook 中执行。

public class DubboBootstrap extends GenericEventListener {

    private DubboBootstrap() {
        configManager = ApplicationModel.getConfigManager();
        environment = ApplicationModel.getEnvironment();

        DubboShutdownHook.getDubboShutdownHook().register();
        ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
            @Override
            public void callback() throws Throwable {
                DubboBootstrap.this.destroy();
            }
        });
    }

}
public class DubboShutdownHook extends Thread {

   public void register() {
        if (registered.compareAndSet(falsetrue)) {
            DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
            Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
            dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
        }
    }

    @Override
    public void run() {
        if (logger.isInfoEnabled()) {
            logger.info("Run shutdown hook now.");
        }

        callback();
        doDestroy();
    }

   private void callback() {
        callbacks.callback();
    }

}

从源码中我们看到,当我们的 Dubbo 应用程序接收到 kill -15 pid 信号时,JVM 捕获到 SIGTERM(15) 信号之后,就会触发 DubboShutdownHook 线程运行,从而通过 callback() 又回调了上小节中介绍的 DubboBootstrap#destroy 方法(dubbo 的整个优雅关闭逻辑全部封装在这里)。

dubbo 非Spring环境下优雅关闭流程.png
public class DubboBootstrap extends GenericEventListener {

    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(truefalse)
                        && destroyed.compareAndSet(falsetrue)) {

                    ........取消注册......
                  
                    ........取消元数据服务........
                  
                    ........停止暴露服务........
                 
                    ........取消订阅服务........
                 
                    ........注销注册中心........
                 
                    ........关闭服务........
                  
                    ........销毁注册中心客户端实例........
                 
                    ........清除应用配置类以及相关应用模型........
                
                    ........关闭线程池........
                 
                    ........释放资源........
                 
                }
            } finally {
                destroyLock.unlock();
            }
        }
    }

}

5.4 啊哈!Bug!

前边我们在《5.1 Dubbo在Spring环境下的优雅关闭》小节和《5.3 Dubbo在非Spring环境下的优雅关闭》小节中介绍的这两个环境的下的优雅关闭方案,当它们在各自的场景下运行的时候是没有任何问题的。

但是当这两种方案结合在一起运行,就出大问题了~~~

还记得笔者在《3.2 使用 ShutdownHook 的注意事项》小节中特别强调的一点:

  • ShutdownHook 其实本质上是一个已经被初始化但是未启动的 Thread ,这些通过 Runtime.getRuntime().addShutdownHook 方法注册的 ShutdownHooks ,在 JVM 进程关闭的时候会被启动 并发执行,但是并不会保证执行顺序

所以在编写 ShutdownHook 中的逻辑时,我们应该确保程序的线程安全性,并尽可能避免死锁。最好是一个 JVM 进程只注册一个 ShutdownHook 。

Dubbo在Spring环境下的优雅关闭Bug.png

那么现在 JVM 中我们注册了两个 ShutdownHook 线程,一个 Spring 的 ShutdownHook ,另一个是 Dubbo 的 ShutdonwHook 。那么这会引出什么问题呢?

经过前边的内容介绍我们知道,无论是在 Spring 的 ShutdownHook 中触发的 ContextClosedEvent 事件还是在 Dubbo 的 ShutdownHook 中执行的 CallBack 。最终都会调用到 DubboBootstrap#destroy 方法执行真正的优雅关闭逻辑。

public class DubboBootstrap extends GenericEventListener {

    private final Lock destroyLock = new ReentrantLock();

    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(truefalse)
                        && destroyed.compareAndSet(falsetrue)) {
                    
                        .......dubbo应用的优雅关闭.......
                 
                }
            } finally {
                destroyLock.unlock();
            }
        }
    }

}

让我们来设想一个这种的场景:当 Spring 的 ShutdownHook 线程和 Dubbo 的 ShutdownHook 线程同时执行并且在同一个时间点来到 DubboBootstrap#destroy 方法中争夺 destroyLock 。

  • Dubbo 的 ShutdownHook 线程获得 destroyLock 进入 destroy() 方法体开始执行优雅关闭逻辑。

  • Spring 的 ShutdownHook 线程没有获得 destroyLock,退出 destroy() 方法。

Dubbo优雅关闭Bug.png

在 Spring 的 ShutdownHook 线程退出 destroy() 方法之后紧接着就会执行 destroyBeans() 方法销毁 IOC 容器中的 Bean ,这里边肯定涉及到一些关键业务 Bean 的销毁,比如:数据库连接池,以及 Dubbo 相关的核心 Bean。

于此同时 Dubbo 的 ShutdownHook 线程开始执行优雅关闭逻辑,《1.2 优雅关闭》小节中我们提到,优雅关闭要保证业务无损。所以需要将剩下正在进行中的业务流程继续处理完毕并将业务处理结果响应给客户端。但是这时依赖的一些业务关键 Bean 已经被销毁,比如数据库连接池,这时执行数据库操作就会抛出 CannotGetJdbcConnectionException 。导致优雅关闭失败,对业务造成了影响。

5.5 Bug 的修复

该 Bug 最终在 apache dubbo 2.7.15 版本中被修复

详情可查看Issue:https://github.com/apache/dubbo/issues/7093

经过上小节的分析,我们知道既然这个 Bug 产生的原因是由于 Spring 的 ShutdownHook 线程和 Dubbo 的 ShutdownHook 线程并发执行所导致的。

那么当我们处于 Spring 环境下的时候,就将 Dubbo 的 ShutdownHook 注销掉即可。

public class SpringExtensionFactory implements ExtensionFactory {
    private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);

    private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();

    public static void addApplicationContext(ApplicationContext context) {
        CONTEXTS.add(context);
        if (context instanceof ConfigurableApplicationContext) {
            // 注册 Spring 的 ShutdownHook
            ((ConfigurableApplicationContext) context).registerShutdownHook();
            // 在 Spring 环境下将 Dubbo 的 ShutdownHook 取消掉
            DubboShutdownHook.getDubboShutdownHook().unregister();
        }
    }
}

而在非 Spring 环境下,我们依然保留 Dubbo 的 ShutdownHook 。

public class DubboBootstrap {

    private DubboBootstrap() {
        configManager = ApplicationModel.getConfigManager();
        environment = ApplicationModel.getEnvironment();

        DubboShutdownHook.getDubboShutdownHook().register();
        ShutdownHookCallbacks.INSTANCE.addCallback(DubboBootstrap.this::destroy);
    }

}

以上内容就是 Dubbo 的整个优雅关闭主线流程,以及优雅关闭 Bug 产生的原因和修复方案。


在 Dubbo 的优雅关闭流程中最终会通过 DubboShutdownHook.destroyProtocols() 关闭底层服务。

public class DubboBootstrap extends GenericEventListener {

    private final Lock destroyLock = new ReentrantLock();

    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(truefalse)
                        && destroyed.compareAndSet(falsetrue)) {
                    
                        .......dubbo应用的优雅关闭.......
                    //关闭服务
                    DubboShutdownHook.destroyProtocols();

                        .......dubbo应用的优雅关闭.......

                }
            } finally {
                destroyLock.unlock();
            }
        }
    }

}

在 Dubbo 服务的销毁过程中,会通过调用 server.close 关闭底层的 Netty 服务。

public class DubboProtocol extends AbstractProtocol {

   @Override
    public void destroy() {
        for (String key : new ArrayList<>(serverMap.keySet())) {
            ProtocolServer protocolServer = serverMap.remove(key);
            RemotingServer server = protocolServer.getRemotingServer();
            server.close(ConfigurationUtils.getServerShutdownTimeout());
             ...........省略........
        }

         ...........省略........
}

最终触发 Netty 的优雅关闭。

public class NettyServer extends AbstractServer implements RemotingServer {

    @Override
    protected void doClose() throws Throwable {
        ..........关闭底层Channel......
        try {
            if (bootstrap != null) {
                // 关闭 Netty 的主从 Reactor 线程组
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        .........清理缓存Channel数据.......
    }

}

6. Netty 的优雅关闭

通过上小节介绍 dubbo 优雅关闭的相关内容,我们很自然的引出了 Netty 的优雅关闭触发时机,那么在本小节中笔者将为大家详细介绍下 Netty 是如何优雅地装..........优雅地谢幕的~~

             

image.png

在之前的系列文章中,我们围绕下图所展示的 Netty 整个核心框架的运转流程介绍了主从 ReactorGroup 的创建启动运行接收网络连接接收网络数据发送网络数据,以及如何在pipeline中处理相关IO事件的整个源码实现。

netty中的reactor.png

本小节就到了 Netty 优雅谢幕的时刻了,在这谢幕的过程中,Netty 会对它的主从 ReactorGroup ,以及对应 ReactorGroup 中的 Reacto r进行优雅的关闭。下面让我们一起来看下这个优雅关闭的过程~~~

6.1 ReactorGroup 的优雅谢幕


public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

   @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

}

在 Netty 进行优雅关闭的整个过程中,这里涉及到了两个非常重要的控制参数:

  • gracefulShutdownQuietPeriod:优雅关闭静默期,默认为 2s 。这个参数主要来保证 Netty 整个关闭过程中的优雅。在关闭流程开始后,如果 Reactor 中还有遗留的异步任务需要执行,那么 Netty 就不能关闭,需要把所有异步任务执行完毕才可以。当所有异步任务执行完毕后,Netty 为了实现更加优雅的关闭操作,一定要保障业务无损,这时候就引入了静默期这个概念,如果在这个静默期内,用户没有新的任务向 Reactor 提交那么就开始关闭。如果在这个静默期内,还有用户继续提交异步任务,那么就不能关闭,需要把静默期内用户提交的异步任务执行完毕才可以放心关闭。

  • gracefulShutdownTimeout:优雅关闭超时时间,默认为 15s 。这个参数主要来保证 Netty 整个关闭过程的可控。我们知道一个生产级的优雅关闭方案既要保证优雅做到业务无损,更重要的是要保证关闭流程的可控,不能无限制的优雅下去。导致长时间无法完成关闭动作。于是 Netty 就引入了这个参数,如果优雅关闭超时,那么无论此时有无异步任务需要执行都要开始关闭了。

这两个控制参数是非常重要核心的两个参数,我们在后面介绍 Netty 关闭细节的时候还会为大家详细剖析,这里大家先从概念上大概理解一下。

在介绍完这两个重要核心参数之后,我们接下来看下 ReactorGroup 的关闭流程:

我们都知道 Netty 为了保证整个系统的吞吐量以及保证 Reactor 可以线程安全地,有序地处理各个 Channel 上的 IO 事件。基于这个目的 Netty 将其承载的海量连接分摊打散到不同的 Reactor 上处理。

ReactorGroup 中包含多个 Reactor ,每个 Channel 只能注册到一个固定的 Reactor 上,由这个固定的 Reactor 负责处理该 Channel 上整个生命周期的事件。

一个 Reactor 上注册了多个 Channel ,负责处理注册在其上的所有 Channel 的 IO 事件以及异步任务。

ReactorGroup 的结构如下图所示:

image.png

ReactorGroup 的关闭流程本质上其实是 ReactorGroup 中包含的所有 Reactor 的关闭,当 ReactorGroup 中的所有 Reactor 完成关闭后,ReactorGroup 才算是真正的关闭。


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    // Reactor线程组中的Reactor集合
    private final EventExecutor[] children;

    // 关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

}

  • EventExecutor[] children:数组中存放的是当前 ReactorGroup 中包含的所有 Reactor,类型为 EventExecutor。

  • Promise<?> terminationFuture:ReactorGroup 中的关闭 Future ,用户线程通过这个 terminationFuture 可以知道 ReactorGroup 完成关闭的时机,也可以向 terminationFuture 注册一些 listener 。当 ReactorGroup 完成关闭动作后,会回调用户注册的这些 listener 。大家可以根据各自的业务场景灵活运用。

在 ReactorGroup 的关闭过程中,会挨个触发它所包含的所有 Reactor 的关闭流程。并返回 terminationFuture 给用户线程。

当 ReactorGroup 中的所有 Reactor 完成关闭之后,这个 terminationFuture 会被设置为 success,这样一来用户线程可以感知到 ReactorGroup 已经完成关闭了。

这一点笔者也在《Reactor在Netty中的实现(创建篇)》一文中的第四小节《4. 向Reactor线程组中所有的Reactor注册terminated回调函数》强调过。

在 ReactorGroup 创建的最后一步,会定义 Reactor 关闭的 terminationListener。在 Reactor 的 terminationListener 中会判断当前 ReactorGroup 中的 Reactor 是否全部关闭,如果已经全部关闭,则会设置 ReactorGroup的 terminationFuture 为 success 。

    //记录关闭的Reactor个数,当Reactor全部关闭后,ReactorGroup才可以认为关闭成功
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    //ReactorGroup的关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args)
 
{

        ........挨个创建Reactor............

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 ReactorGroup才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            //向每个Reactor注册terminationListener
            e.terminationFuture().addListener(terminationListener);
        }
    }

从以上 ReactorGroup 的关闭流程我们可以看出,ReactorGroup 的关闭逻辑只是挨个去触发它所包含的所有 Reactor 的关闭,Netty 的整个优雅关闭核心其实是在单个 Reactor 的关闭逻辑上。毕竟 Reactor 才是真正驱动 Netty 运转的核心引擎。

6.2 Reactor 的优雅谢幕

Reactor的优雅谢幕流程.png

Reactor 的状态特别重要,从《一文聊透Netty核心引擎Reactor的运转架构》一文中我们知道 Reactor 是在一个 for (;;) {....} 死循环中 996 不停地工作。比如轮询 Channel 上的 IO 就绪事件,处理 IO 就绪事件,执行异步任务就是在这个死循环中完成的。

而 Reactor 在每一次循环任务结束之后,都会先去判断一下当前 Reactor 的状态,如果状态变为准备关闭状态 ST_SHUTTING_DOWN 后,Reactor 就会开启优雅关闭流程。

所以在介绍 Reactor 的关闭流程之前,笔者先来为大家捋一捋 Reactor 中的各种状态。

  • ST_NOT_STARTED = 1:Reactor 的初始状态。在 Reactor 刚被创建出来的时候,状态为 ST_NOT_STARTED 。

  • ST_STARTED = 2:Reactor 的启动状态。当向 Reactor 提交第一个异步任务的时候会触发 Reactor 的启动。启动之后状态变为 ST_STARTED 。

相关细节可在回顾下《详细图解Netty Reactor启动全流程》一文。

  • ST_SHUTTING_DOWN = 3:Reactor 准备开始关闭状态。当 Reactor 的 shutdownGracefully 方法被调用的时候,Reactor 的状态就会变为ST_SHUTTING_DOWN。在这个状态下,用户仍然可以向 Reactor 提交任务。

  • ST_SHUTDOWN = 4:Reactor 停止状态。表示 Reactor 的优雅关闭流程已经结束,此时用户不能在向 Reactor 提交任务,Reactor 会在这个状态下最后一次执行剩余的异步任务。

  • ST_TERMINATED = 5:Reactor 真正的终结状态,该状态表示 Reactor 已经完全关闭了。在这个状态下 Reactor 会设置自己的 terminationFuture 为 Success。进而开始回调上小节末尾提到的 terminationListener 。

在我们了解了 Reactor 的各种状态之后,下面就该来正式开始介绍 Reactor 的关闭流程了:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    //Reactor的状态  初始为未启动状态
    private volatile int state = ST_NOT_STARTED;
 
    //Reactor的初始状态,未启动
    private static final int ST_NOT_STARTED = 1;
    //Reactor启动后的状态
    private static final int ST_STARTED = 2;
    //准备正在进行优雅关闭,此时用户仍然可以提交任务,Reactor仍可以执行任务
    private static final int ST_SHUTTING_DOWN = 3;
    //Reactor停止状态,表示优雅关闭结束,此时用户不能在提交任务,Reactor最后一次执行剩余的任务
    private static final int ST_SHUTDOWN = 4;
    //Reactor中的任务已被全部执行完毕,且不在接受新的任务,真正的终止状态
    private static final int ST_TERMINATED = 5;

    //优雅关闭的静默期
    private volatile long gracefulShutdownQuietPeriod;
    //优雅关闭超时时间
    private volatile long gracefulShutdownTimeout;

    //Reactor的关闭Future
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

        ......省略参数校验.......

        //此时Reactor的状态为ST_STARTED
        if (isShuttingDown()) {
            return terminationFuture();
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            //需要唤醒Reactor去执行关闭流程
            wakeup = true;
            oldState = state;
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        //Reactor正在关闭或者已经关闭
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        //优雅关闭静默期,在该时间内,用户还是可以向Reactor提交任务并且执行,只要有任务在Reactor中,就不能进行关闭
        //每隔100ms检测是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭 保证关闭行为的优雅
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        //优雅关闭的最大超时时间,优雅关闭行为不能超过该时间,如果超过的话 不管当前是否还有任务 都要进行关闭
        //保证关闭行为的可控
        gracefulShutdownTimeout = unit.toNanos(timeout);

        //这里需要保证Reactor线程是在运行状态,如果已经停止,那么就不在进行后续关闭行为,直接返回terminationFuture
        if (ensureThreadStarted(oldState)) {
            return terminationFuture;
        }

        //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程
        if (wakeup) {
            //确保Reactor线程在执行完任务之后 不会在selector上停留
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) {
                //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
                wakeup(inEventLoop);
            }
        }

        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

}

首先在开启关闭流程之前,需要调用 isShuttingDown() 判断一下当前 Reactor 是否已经开始关闭流程或者已经完成关闭。如果已经开始关闭了,这里会直接返回 Reactor 的 terminationFuture 。


    @Override
    public boolean isShuttingDown() {
        return state >= ST_SHUTTING_DOWN;
    }

剩下的逻辑就是不停的在一个 for 循环中通过 CAS 不停的尝试将 Reactor 的当前 ST_STARTED 状态改为 ST_SHUTTING_DOWN 正在关闭状态。

如果通过 inEventLoop() 判断出当前执行线程是 Reactor 线程,那么表示当前 Reactor 的状态只会是 ST_STARTED 运行状态,那么就可以直接将 newState 设置为 ST_SHUTTING_DOWN 。因为只有 Reactor 处于 ST_STARTED 状态的时候才会运行到这里。否则在前边就直接返回 terminationFuture了。

如果当前执行线程为用户线程并不是 Reactor 线程的话,那么此时 Reactor 的状态可能是正在关闭状态或者已经关闭状态,用户线程在重复发起 Reactor 的关闭流程。所以这些异常场景的处理会在 switch(oldState){....} 语句中完成。

            switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        //Reactor正在关闭或者已经关闭
                        newState = oldState;
                        //当前Reactor已经处于关闭流程中,则无需在唤醒Reactor了
                        wakeup = false;
                }

如果当前 Reactor 还未发起关闭流程,比如状态为 ST_NOT_STARTED 或者 ST_STARTED ,那么直接可以放心的将 newState 设置为 ST_SHUTTING_DOWN 。

如果当前 Reactor 已经处于关闭流程中或者已经完成关闭,比如状态为 ST_SHUTTING_DOWN ,ST_SHUTDOWN 或者 ST_TERMINATED 。则没有必要在唤醒 Reactor 重复执行关闭流程了 wakeup = false。Reactor 的状态维持当前状态不变。

当 Reactor 的状态确定完毕后,则在 for 循环中不断的通过 CAS 修改 Reactor 的当前状态。此时 oldState = ST_STARTED ,newState = ST_SHUTTING_DOWN 。


          if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }

随后在 Reactor 中设置我们在《6.1 ReactorGroup 的优雅谢幕》小节开始处介绍的控制 Netty 优雅关闭的两个非常重要的核心参数:

  • gracefulShutdownQuietPeriod:优雅关闭静默期,默认为 2s 。当 Reactor 中已经没有异步任务需要在执行时,该静默期开始触发,Netty 在这里会每隔 100ms 检测一下是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭,保证关闭行为的优雅。

  • gracefulShutdownTimeout:优雅关闭超时时间,默认为 15s 。优雅关闭行为不能超过该时间,如果超过的话不管当前是否还有任务都要进行关闭,保证关闭行为的可控。

流程走到这里,Reactor 就开始准备执行关闭流程了,那么在进行关闭操作之前,我们需要确保 Reactor 线程此时应该是运行状态,如果此时 Reactor 线程还未开始运行那么就需要让它运行起来执行关闭操作。


        //这里需要保证Reactor线程是在运行状态,如果已经停止,
        //那么就不在进行后续关闭行为,直接返回terminationFuture
        if (ensureThreadStarted(oldState)) {
            return terminationFuture;
        }


    private boolean ensureThreadStarted(int oldState) {
        if (oldState == ST_NOT_STARTED) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);

                if (!(cause instanceof Exception)) {
                    // Also rethrow as it may be an OOME for example
                    PlatformDependent.throwException(cause);
                }
                return true;
            }
        }
        return false;
    }

如果此时 Reactor 线程刚刚执行完异步任务或者正在 Selector 上阻塞,那么我们需要确保 Reactor 线程被及时的唤醒,从而可以直接进入关闭流程。wakeup == true。

这里的 addTaskWakesUp 默认为 false 。表示并不是只有 addTask 方法才能唤醒 Reactor 线程 还有其他方法可以唤醒 Reactor 线程,比如 SingleThreadEventExecutor#execute 方法还有本小节介绍的 SingleThreadEventExecutor#shutdownGracefully 方法都会唤醒 Reactor 线程。

关于 addTaskWakesUp 字段的详细含义和作用,大家可以回顾下《一文聊透 Netty 核心引擎 Reactor 的运转架构》一文中的《1.2.2 Reactor 开始轮询 IO 就绪事件》小节。


     //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程
        if (wakeup) {
            //确保Reactor线程在执行完任务之后 不会在selector上停留
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) {
                //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
                wakeup(inEventLoop);
            }
        }

  • 通过 taskQueue.offer(WAKEUP_TASK) 向 Reactor 中添加 WAKEUP_TASK,可以确保 Reactor 在执行完异步任务之后不会在 Selector 上做停留,直接执行关闭操作。

  • 如果此时 Reactor 线程正在 Selector 上阻塞,那么直接调用 wakeup(inEventLoop) 唤醒 Reactor 线程,直接来到关闭流程。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
    }
}

6.3 Reactor 线程的优雅关闭

我们先来通过一张 Reactor 优雅关闭整体流程图来从总体上俯撼一下关闭流程:

Reactor线程优雅关闭流程.png

通过《一文聊透Netty核心引擎Reactor的运转架构》一文的介绍,我们知道 Reacto r是在一个 for 循环中 996 不停地处理 IO 事件以及执行异步任务。如下面笔者提取的 Reactor 运行框架所示:

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            try {
                  .......1.监听Channel上的IO事件.......
                  .......2.处理Channel上的IO事件.......
                  .......3.执行异步任务..........
            } finally {
                try {
                    if (isShuttingDown()) {
                        //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                        closeAll();
                        //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

}

在 Reactor 在每次 for 循环的末尾 finally{....} 语句块中都会通过 isShuttingDown() 方法去检查当前 Reactor 的状态是否是关闭状态,如果是关闭状态则开始正式进入 Reactor 的优雅关闭流程。

我们在本文前边《1.2 优雅关闭》小节中在讨论优雅关闭方案的时候提到,我们要着重从以下两个方面来实施优雅关闭:

  1. 首先需要切走程序承担的现有流量。

  2. 保证现有剩余的任务可以执行完毕,保证业务无损。

Netty 这里实现的优雅关闭同样也遵从这两个要点。

  1. 在优雅关闭流程开始之前首先会调用 closeAll() 方法,将 Reactor 上注册的所有 Channel 全部关闭掉,切掉现有流量。

  2. 随后会调用 confirmShutdown() 方法,将剩余的异步任务执行完毕。在该方法中只要有异步任务需要执行,就不能关闭,保证业务无损。该方法返回值为 true 时表示可以进行关闭。返回 false 时表示不能马上关闭。

6.3.1 切走流量

    private void closeAll() {
        //这里的目的是清理selector中的一些无效key
        selectAgain();
        //获取Selector上注册的所有Channel
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            //获取NioSocketChannel
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                .........省略......
            }
        }

        for (AbstractNioChannel ch: channels) {
            //关闭Reactor上注册的所有Channel,并在pipeline中触发unActive事件和unRegister事件
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

首先会通过 selectAgain() 最后一次在 Selector 上执行一次非阻塞轮询操作,目的是清除 Selector 上的一些无效 Key 。

关于无效 Key 的清除,详细细节大家可以回看下《一文聊透Netty核心引擎Reactor的运转架构》一文中的《3.1.3 从Selector中移除失效的SelectionKey》小节。

随后通过 selector.keys() 获取在 Selector 上注册的所有 SelectionKey 。进而获取到 Netty 中的 NioSocketChannel 。SelectionKey 与 NioSocketChannel 的对应关系如下图所示:

channel与SelectionKey对应关系.png

最后将注册在 Reactor 上的这些 NioSocketChannel 挨个进行关闭。

Channel 的关闭流程可以回看下笔者的这篇文章 《且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景》

6.3.2 保证业务无损

该方法中的逻辑是保证 Reactor 进行优雅关闭的核心,Netty 这里为了保证业务无损,采取的是只要有异步任务 Task 或者 ShutdwonHooks 需要执行,就不能关闭,需要等待所有 tasks 或者 ShutdownHooks 执行完毕,才会考虑关闭的事情。

    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }

        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }

        //取消掉所有的定时任务
        cancelScheduledTasks();

        if (gracefulShutdownStartTime == 0) {
            //获取优雅关闭开始时间,相对时间
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }

        //这里判断只要有task任务需要执行就不能关闭
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                // Executor shut down - no new tasks anymore.
                return true;
            }

            /**
             * gracefulShutdownQuietPeriod表示在这段时间内,用户还是可以继续提交异步任务的,Reactor在这段时间内
             * 是会保证这些任务被执行到的。
             *
             * gracefulShutdownQuietPeriod = 0 表示 没有这段静默时期,当前Reactor中的任务执行完毕后,无需等待静默期,执行关闭
             * */

            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            //避免Reactor在Selector上阻塞,因为此时已经不会再去处理IO事件了,专心处理关闭流程
            taskQueue.offer(WAKEUP_TASK);
            return false;
        }

        //此时Reactor中已经没有任务可执行了,是时候考虑关闭的事情了
        final long nanoTime = ScheduledFutureTask.nanoTime();

        //当Reactor中所有的任务执行完毕后,判断是否超过gracefulShutdownTimeout
        //如果超过了 则直接关闭
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }

        //即使现在没有任务也还是不能进行关闭,需要等待一个静默期,在静默期内如果没有新的任务提交,才会进行关闭
        //如果在静默期内还有任务继续提交,那么静默期将会重新开始计算,进入一轮新的静默期检测
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            taskQueue.offer(WAKEUP_TASK);
            try {
                //gracefulShutdownQuietPeriod内每隔100ms检测一下 是否有任务需要执行
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }

            return false;
        }

        // 在整个gracefulShutdownQuietPeriod期间内没有任务需要执行或者静默期结束 则无需等待gracefulShutdownTimeout超时,直接关闭
        return true;
    }

在关闭流程开始之前,Netty 首先会调用 cancelScheduledTasks() 方法将 Reactor 中剩余需要执行的定时任务全部取消掉。

记录优雅关闭开始时间 gracefulShutdownStartTime ,这是为了后续判断优雅关闭流程是否超时。

调用 runAllTasks() 方法将 Reactor 中 TaskQueue 里剩余的异步任务全部取出执行。

运行剩余tasks和hooks.png

调用 runShutdownHooks() 方法将用户注册在 Reactor 上的 ShutdownHook 取出执行。

我们可以在用户线程中通过如下方式向 Reactor 中注册 ShutdownHooks :

        NioEventLoop reactor = (NioEventLoop) ctx.channel().eventLoop();
        reactor.addShutdownHook(new Runnable() {
            @Override
            public void run() {
                .....关闭逻辑....
            }
        });

在 Reactor 进行关闭的时候,会取出用户注册的这些 ShutdownHooks 进行运行。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

   //可以向Reactor添加shutdownHook,当Reactor关闭的时候会被调用
   private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();

   private boolean runShutdownHooks() {
        boolean ran = false;
        while (!shutdownHooks.isEmpty()) {
            List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
            shutdownHooks.clear();
            for (Runnable task: copy) {
                try {
                    //Reactor线程挨个顺序同步执行
                    task.run();
                } catch (Throwable t) {
                    logger.warn("Shutdown hook raised an exception.", t);
                } finally {
                    ran = true;
                }
            }
        }

        if (ran) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }

        return ran;
    }

}

需要注意的是这里的 ShutdownHooks 是 Netty 提供的一种机制并不是我们在《3. JVM 中的 ShutdownHook》小节中介绍的 JVM 中的 ShutdownHooks 。

JVM 中的 ShutdownHooks 是一个 Thread ,JVM 在关闭之前会并发无序地运行。而 Netty 中的 ShutdownHooks 是一个 Runnable ,Reactor 在关闭之前,会由 Reactor 线程同步有序地执行。

这里需要注意的是只要有 tasks 和 hooks 需要执行 Netty 就会一直执行下去直到这些任务全部执行完为止

当 Reactor 没有任何任务需要执行时,这时就会判断当前关闭流程所用时间是否超过了我们前边设定的优雅关闭最大超时时间 gracefulShutdownTimeout 。

nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout

如果关闭流程因为前边这些任务的执行导致已经超时,那么就直接关闭 Reactor ,退出 Reactor 的工作循环。

如果没有超时,那么这时就会触发前边介绍的优雅关闭的静默期 gracefulShutdownQuietPeriod 。

在静默期中 Reactor 线程会每隔 100ms 检查一下是否有用户提交任务请求,如果有的话,就需要保证将用户提交的这些任务执行完毕。然后静默期将会重新开始计算,进入一轮新的静默期检测。

如果在整个静默期内,没有任何任务提交,则无需等待 gracefulShutdownTimeout 超时,直接关闭 Reactor ,退出 Reactor 的工作循环。

从以上过程我们可以看出 Netty 的优雅关闭至少需要等待一个静默期的时间。还有一点是 Netty 优雅关闭的时间可能会超出 gracefulShutdownTimeout ,因为 Netty 需要保证遗留剩余的任务被执行完毕。当所有任务执行完毕之后,才会去检测是否超时。

6.4 Reactor 的最终关闭流程

当在静默期内没有任何任务提交或者关闭流程超时时,上小节中介绍的 confirmShutdown() 就会返回 true 。随即 Reactor 线程就会退出工作循环。

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            try {
                  .......1.监听Channel上的IO事件.......
                  .......2.处理Channel上的IO事件.......
                  .......3.执行异步任务..........
            } finally {
                try {
                    if (isShuttingDown()) {
                        //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                        closeAll();
                        //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

}

我们在《详细图解 Netty Reactor 启动全流程》一文中的《1.3.3 Reactor 线程的启动》小节中的介绍中提到,Reactor 线程的启动是通过第一个异步任务被提交到 Reactor 中的时候被触发的。在向 Reactor 提交任务的方法 SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 中会触发下面 doStartThread() 方法的调用,在这里会调用前边提到的 Reactor 工作循环 run() 方法。

在 doStartThread() 方法的 finally{...} 语句块中会完成 Reactor 的最终关闭流程,也就是 Reactor 在退出 run 方法中的 for 循环之后的后续收尾流程。

最终 Reactor 的优雅关闭完整流程如下图所示:

Reactor优雅关闭全流程.png
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {

                ..........省略.........

                try {
                    //Reactor线程开始轮询处理IO事件,执行异步任务
                    SingleThreadEventExecutor.this.run();
                    //后面的逻辑为用户调用shutdownGracefully关闭Reactor退出循环 走到这里
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //走到这里表示在静默期内已经没有用户在向Reactor提交任务了,或者达到优雅关闭超时时间,开始对Reactor进行关闭
                    //如果当前Reactor不是关闭状态则将Reactor的状态设置为ST_SHUTTING_DOWN
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    try {
                        for (;;) {
                            //此时Reactor线程虽然已经退出,而此时Reactor的状态为shuttingdown,但任务队列还在
                            //用户在此时依然可以提交任务,这里是确保用户在最后的这一刻提交的任务可以得到执行。
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        for (;;) {
                            // 当Reactor的状态被更新为SHUTDOWN后,用户提交的任务将会被拒绝
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // 这里Reactor的状态已经变为SHUTDOWN了,不会在接受用户提交的新任务了
                        // 但为了防止用户在状态变为SHUTDOWN之前,也就是Reactor在SHUTTINGDOWN的时候 提交了任务
                        // 所以此时Reactor中可能还会有任务,需要将剩余的任务执行完毕
                        confirmShutdown();
                    } finally {
                        try {
                            //SHUTDOWN状态下,在将全部的剩余任务执行完毕后,则将Selector关闭
                            cleanup();
                        } finally {
                            // 清理Reactor线程中的threadLocal缓存,并通知相应future。
                            FastThreadLocal.removeAll();

                            //ST_TERMINATED状态为Reactor真正的终止状态
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            
                            //使得awaitTermination方法返回
                            threadLock.countDown();

                            //统计一下当前reactor任务队列中还有多少未执行的任务,打出日志
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }

                            /**
                             * 通知Reactor的terminationFuture成功,在创建Reactor的时候会向其terminationFuture添加Listener
                             * 在listener中增加terminatedChildren个数,当所有Reactor关闭后 ReactorGroup关闭成功
                             * */

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
}

流程走到 doStartThread 方法中的 finally{...} 语句块中的时候,这个时候表示在优雅关闭的静默期内,已经没有任务继续向 Reactor 提交了。或者关闭耗时已经超过了设定的优雅关闭最大超时时间。

现在正式来到了 Reactor 的关闭流程。在流程开始之前需要确保当前 Reactor 的状态为 ST_SHUTTING_DOWN 正在关闭状态。

注意此刻用户线程依然可以向 Reactor 提交任务。当 Reactor 的状态变为 ST_SHUTDOWN 或者 ST_TERMINATED 时,用户向 Reactor 提交的任务就会被拒绝,但是此时 Reactor 的状态为 ST_SHUTTING_DOWN ,依然可以接受用户提交过来的任务。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
  @Override
  public boolean isShutdown() {
        return state >= ST_SHUTDOWN;
  }

  private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            //当Reactor的状态为ST_SHUTDOWN时,拒绝用户提交的异步任务,但是在优雅关闭ST_SHUTTING_DOWN状态时还是可以接受用户提交的任务的
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    reject();
                }
            }
        }

        .........省略........
    }
}

所以 Reactor 从工作循环 run 方法中退出随后流程一路走到这里来的这段时间,用户仍然有可能向 Reactor 提交任务,为了确保关闭流程的优雅,这里会在 for 循环中不停的执行 confirmShutdown() 方法直到所有的任务全部执行完毕。

随后会将 Reactor 的状态改为 ST_SHUTDOWN 状态,此时用户就不能在向 Reactor 提交任务了。如果此时在提交任务就会收到 RejectedExecutionException 异常。

大家这里可能会有疑问,Netty 在 Reactor 的状态变为 ST_SHUTDOWN 之后,又一次调用了 confirmShutdown() 方法,这是为什么呢?

其实这样做的目的是为了防止 Reactor 状态在变为 SHUTDOWN 之前,在这个极限的时间里,用户又向 Reactor 提交了任务,所以还需要最后一次调用 confirmShutdown() 将在这个极限时间内提交的任务执行完毕。

以上逻辑步骤就是真正优雅关闭的精髓所在,确保任务全部执行完毕,保证业务无损。

在我们优雅处理流程介绍完了之后,下面就是关闭 Reactor 的流程了:

Reactor 会在 SHUTDOWN 状态下,将 Selector 进行关闭。

    @Override
    protected void cleanup() {
        try {
            selector.close();
        } catch (IOException e) {
            logger.warn("Failed to close a selector.", e);
        }
    }

清理 Reactor 线程中遗留的所有 ThreadLocal 缓存。

FastThreadLocal.removeAll();

将 Reactor 的状态由 SHUTDOWN 改为 ST_TERMINATED 状态。此时 Reactor 就算真正的关闭了

 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

用户线程可能会调用 Reactor 的 awaitTermination 方法阻塞等待 Reactor 的关闭,当 Reactor 关闭之后会调用 threadLock.countDown() 使得用户线程从 awaitTermination 方法返回。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private final CountDownLatch threadLock = new CountDownLatch(1);

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        
         ........省略.......

        //等待Reactor关闭
        threadLock.await(timeout, unit);
        return isTerminated();
    }

    @Override
    public boolean isTerminated() {
        return state == ST_TERMINATED;
    }
}

当这一切处理完毕之后,最后就会设置 Reactor 的 terminationFuture 为 success 。此时注册在 Reactor 的 terminationFuture 上的 listener 就会被回调。

这里还记得我们在《Reactor 在 Netty 中的实现(创建篇)》一文中介绍的,在 ReactorGroup 中的所有 Reactor 被挨个全部创建成功之后,会向所有 Reactor 的 terminationFuture 注册一个 terminationListener 。

在 terminationListener 中检测当前 ReactorGroup 中的所有 Reactor 是否全部完成关闭,如果已经全部关闭,则设置 ReactorGroup 的 terminationFuture 为Success。此刻 ReactorGroup 关闭流程结束,Netty 正式优雅谢幕完毕~~


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    //Reactor线程组中的Reactor集合
    private final EventExecutor[] children;
    //记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    //ReactorGroup关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args)
 
{
      
        ........挨个创建Reactor........

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        ........省略........
    }

}


到现在为止,Netty 的整个优雅关闭流程,笔者就为大家详细介绍完了,下图为整个优雅关闭的完整流程图,大家可以对照下面这副总体流程图在回顾下我们前面介绍的源码逻辑。

Reactor优雅关闭总流程.png

6.5 Reactor 的状态变更流转

在本文的最后,笔者再来带着大家回顾下 Reactor 的状态变更流程。

Reactor的状态变更.png
  • 在 Reactor 被创建出来之后状态为 ST_NOT_STARTED。

  • 随着第一个异步任务的提交 Reactor 开始启动随后状态为 ST_STARTED 。

  • 当调用 shutdownGracefully 方法之后,Reactor 的状态变为 ST_SHUTTING_DOWN 。表示正在进行优雅关闭。此时用户仍可向 Reactor 提交异步任务。

  • 当 Reactor 中遗留的任务全部执行完毕之后,Reactor 的状态变为 ST_SHUTDOWN 。此时如果用户继续向 Reactor 提交异步任务,会被拒绝,并收到 RejectedExecutionException 异常。

  • 当 Selector 完成关闭,并清理掉 Reactor 线程中所有的 TheadLocal 缓存之后,Reactor 的状态变为 ST_TERMINATED 。

总结

到这里关于优雅关闭的前世今生笔者就位大家全部交代完毕了,信息量比较大,需要好好消化一下,很佩服大家能够一口气看到这里。

本文我们从进程优雅启停方案开始聊起,以优雅关闭的实现方案为起点,先是介绍了优雅关闭的底层基石-内核的信号量机制,从内核又聊到了 JVM 的 ShutdownHook 原理以及执行过程,最后通过三个知名的开源框架为案例,分别从 Spring 的优雅关闭机制聊到了 Dubbo 的优雅关闭,最后通过 Dubbo 的优雅关闭引出了 Netty 优雅关闭的详细实现方案,前后呼应。

好了,本文的内容就到这里了,大家辛苦了,相信大家认真看完之后一定会收获很大,我们下篇文章见~~~

 

 

点赞收藏
bin的技术小屋

微信公众号:bin的技术小屋,专注源码解析系列原创技术文章,分享自己的技术感悟

请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

随机一门技术分享之Netty

随机一门技术分享之Netty

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

6
1