Java 并发之 AQS 详解(上)原创
文章导读
前言
本文我们来聊一聊 AQS。
文章开始之前,我们先来思考一个问题:Java 中提供了 synchronized 关键字来保证只有一个线程能够访问同步代码块。既然已经提供了synchronized关键字,那为何在 Java 的 SDK 包中,还会提供 Lock 接口呢?这是不是重复造轮子,多此一举呢?
让我们一起带着疑问往下看。
1、Java SDK 为什么要设计 Lock
很多小伙伴可能会听说过,在 Java 1.5 版本中,synchronized 的性能不如Lock,但在 Java 1.6 版本之后,synchronized 做了很多优化,性能提升了不少。那既然 synchronized 关键字的性能已经提升了,那为何还要使用Lock 呢?
我们常说:「避免重复造轮子」,如果有了轮子还是要坚持再造个轮子,那么肯定传统的轮子在某些应用场景中不能很好的解决问题。
如果我们向更深层次思考的话,就不难想到了:我们使用 synchronized 加锁是无法主动释放锁的,这就会涉及到死锁问题。
死锁问题
不知各位是否还记得,产生死锁的四个必要条件:
- 互斥:在一段时间内某资源仅被一个线程所占有。此时若有其他线程请求该资源,则请求线程只能等待。
- 不可剥夺:线程所获得的资源在未使用完毕之前,不能被其他线程强行夺走,即只能由获得该资源的线程自己来释放(只能是主动释放)。
- 请求与保持:线程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他线程占有,此时请求线程被阻塞,但对自己已获得的资源保持不放。
- 循环等待:在发生死锁时必然存在一个进程等待队列 {P1,P2,…,Pn},其中 P1 等待 P2 占有的资源,P2 等待 P3 占有的资源,…,Pn 等待 P1 占有的资源,形成一个进程等待环路,环路中每一个进程所占有的资源同时被另一个申请,也就是前一个进程占有后一个进程所申请的资源。
只要系统发生死锁,这些条件必然成立,而只要上述条件之一不满足,就不会发生死锁。
synchronized 的局限性
如果我们的程序使用 synchronized 关键字发生了死锁,主要是因为无法破坏「不可剥夺」 这个条件。
要想破坏这个条件,就需要具备申请不到进一步资源,就释放已有资源的能力。
很显然,这个能力是 synchronized 不具备的,使用 synchronized 如果线程申请不到资源就会进入阻塞状态,我们做什么也改变不了它的状态,这是 synchronized 的致命弱点。
对于「不可剥夺」这个条件,我们希望占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了,就可以防止死锁的产生。
旧轮子有弱点,新轮子就要解决这些问题,所以要具备不会阻塞的功能,下面我们就来看下 Lock 是如何解决死锁问题的。
显式锁 Lock
一般来说,破坏「不可剥夺」条件有三种方式:
- 能够响应中断:synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可剥夺条件了。
- 支持超时:如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可剥夺条件。
- 非阻塞地获取锁:如果尝试获取锁失败,并不进入阻塞状态,同样是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可剥夺条件。
体现在 Lock 接口上,就是 Lock 接口提供的三个方法,如下所示。
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
好的方案有了,但鱼和熊掌不可兼得,Lock 多了 synchronized 不具备的特性,自然不会像 synchronized 那样一个关键字三种玩法走遍全天下,在使用上也相对复杂了。
Lock 使用范式
public synchronized void normalSyncMethod() {
Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
lock.unlock();
}
}
规范1:lock.unlock() 放到 finally语句块,保证当前线程执行过程中出现异常时,锁依然能被释放掉,避免死锁的产生。
规范2:在 try{}外面获取锁,在 try-finally 外加锁,如果因为异常导致加锁失败,try-finally 块中的代码不会执行。如果在 try{} 代码块中加锁失败,finally 中的代码无论如何都会执行,但是由于当前线程加锁失败并没有持有 lock 对象锁 ,所以程序会抛出异常。
Lock 是怎样起到锁的作用呢?
如果你熟悉 synchronized,你知道程序编译成 CPU 指令后,在临界区会有 moniterenter 和 moniterexit 指令的出现,可以理解成进出临界区的标识。
从范式上来看:
- lock.lock()获取锁,“等同于” synchronized 的 moniterenter指令
- lock.unlock()释放锁,“等同于” synchronized 的 moniterexit 指令
那 Lock 是怎么做到的呢?
这里先简单说明一下,这样一会到源码分析时,你可以远观设计轮廓,近观实现细节,会变得越发轻松。
其实很简单,比如 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可见性),如果 CAS 更改成功,即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行。
但 Lock 是一个接口,里面根本没有 state 这个变量的存在:
那它又是怎么处理这个 state 呢?很显然需要一点设计的加成了,接口定义行为,具体都是需要实现类的。
Lock 接口的实现类基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的。
那什么是队列同步器呢?我们下面开始进入正题。
2、队列同步器
队列式的同步器 (AbstractQueuedSynchronizer) 简称:AQS。它提供了「把锁分配给谁」这一问题的一种解决方案,使得锁的开发人员可以将精力放在「如何加解锁上」,避免陷于把锁进行分配而带来的种种细节陷阱之中。
例如 JUC 中,如 CountDownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock 等并发工具,均是借助 AQS 完成他们的所需要的锁分配问题。
AQS 框架
我们通过下面的架构图来整体了解一下 AQS 框架:
- 上图中有颜色的为 Method,无颜色的为 Attribution。
- 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。
- AQS 是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。
- 当自定义同步器进行加锁或者解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
下面我们会从整体到细节,从流程到方法逐一剖析 AQS 框架。
AQS 原理概述
AQS 核心思想是:
- 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
- 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
主要原理图如下:
AQS 使用一个 volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 state 值的修改。
下面先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。
AQS 数据结构
AQS 内部维护了一个同步队列,用于管理同步状态。
- 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成一个 Node 节点,将其加入到同步队列中尾部,阻塞该线程;
- 当同步状态被释放时,会唤醒同步队列中「首节点」的线程获取同步状态。
为了将上述步骤弄清楚,我们需要来看一看 Node 结构。
乍一看有点杂乱,我们解释一下几个方法和属性值的含义:
- waitStatus:当前节点在队列中的状态
- thread:表示处于该节点的线程
- prev:前驱指针
- predecessor:返回前驱节点,没有的话抛出 npe
- nextWaiter:指向下一个处于 CONDITION 状态的节点
- next:后继指针
线程两种锁的模式:
- SHARED:表示线程以共享的模式等待锁
- EXCLUSIVE:表示线程正在以独占的方式等待锁
waitStatus 有下面几个枚举值:
- 0:当一个 Node 被初始化的时候的默认值
- CANCELLED(1):当前节点取消获取锁。当等待超时或被中断(响应中断),会触发变更为此状态,进入该状态后节点状态不再变化;
- SIGNAL(-1):后面节点等待当前节点唤醒;
- CONDITION(-2):condition 中使用,当前线程阻塞在 condition。如果其他线程调用了 condition 的 signal 方法,这个结点将从等待队列转移到同步队列队尾,等待获取同步锁;
- PROPAGATE(-3):当前线程处在 SHARED 情况下,该字段才会使用。
在了解数据结构后,接下来了解一下 AQS 的同步状态—— state。
同步状态 state
AQS 中维护了一个名为 state 的字段,意为同步状态,是由 volatile 修饰的,用于展示当前临界资源的获锁情况。
private volatile int state;
下面提供了几个访问这个字段的方法:
- protected final int getState():获取 state 的值
- protected final void setState(int newState):设置 state 的值
- protected final boolean compareAndSetState(int expect, int update):使用 CAS 方式更新 state
这几个方法都是 final 修饰的,说明子类中无法重写它们。我们可以通过修改 state 字段表示的同步状态来实现多线程的「独占模式」和「共享模式」(加锁过程)。
前置知识基本铺垫完毕,我们下面以 ReentrantLock 为例, 分析 AQS 如何实现独占锁。
3、从 ReentrantLock 的实现看 AQS 的原理及应用
ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。ReentrantLock 支持公平锁和非公平锁,公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析 AQS 获取独占锁的方式。
独占式获取同步状态
在非公平锁中,有一段这样的代码:
static final class NonfairSync extends ReentrantLock.Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
看下 acquire(1) 方法。
public final void acquire(int arg) {
// 调用自定义同步器重写的 tryAcquire 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再看一下 tryAcquire 方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里只是 AQS 的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的。
如果该方法返回了 true,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
下面我们分析下线程是何时以及怎样被加入进等待队列中的。
线程加入等待队列
加入队列的时机:当执行 acquire(1) 时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter 加入到等待队列中去。
如何加入队列:通过调用 addWaiter 方法构造 Node 节点(Node.EXCLUSIVE 独占式)并安全的加入到同步队列的「尾部」。
具体实现方法如下:
private Node addWaiter(Node mode) {
// 构造Node节点,包含当前线程信息以及节点模式【独占/共享】
Node node = new Node(Thread.currentThread(), mode);
// 新建变量 pred 将指针指向tail指向的节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 新加入的节点前驱节点指向尾节点
node.prev = pred;
// 因为如果多个线程同时获取同步状态失败都会执行这段代码
// 所以,通过CAS方式确保安全的设置当前节点为最新的尾节点
if (compareAndSetTail(pred, node)) {
// 曾经的尾节点的后继节点指向当前节点
pred.next = node;
// 返回新构建的节点
return node;
}
}
// 如果tail为空,或者CAS设置tail失败,需要一个入队操作
enq(node);
return node;
}
主要的流程如下:
- 通过当前的线程和锁模式新建一个结点 Node;
- 判断当前队列中的尾结点是否为空,如果不为空,将新结点的 prev 指针指向尾结点;
- 通过 compareAndSetTail 方法,完成尾节点的设置。
如果 pred 指针为空或者 cas 失败(当前 pred 指针和 tail 指向的位置不同,说明被别的线程已经修改),调用 enq 将节点添加到 AQS 队列。
private Node enq(final Node node) {
// 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回
for (;;) {
Node t = tail;
// 如果是第一次添加到队列,那么tail=null
if (t == null) { // Must initialize
// 构建一个哨兵结点(空结点),并将头部指针指向它,
if (compareAndSetHead(new Node()))
// 此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
// 进行第二次循环时,tail不为null,进入else区域。
// 将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
node.prev = t;
// 将新节点加入到队列尾节点
if (compareAndSetTail(t, node)) {
//t此时指向tail,所以可以CAS成功,将tail重新指向Node。
//此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
t.next = node;
return t;
}
}
}
}
主要的流程如下:
- 如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点;
- 如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。
其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点,也叫哨兵结点或者虚结点。
有些同学可能会有疑问,为什么会有哨兵结点?
哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法可能会在边界处发生异常,比如要继续向下分析的 acquireQueued() 方法。
等待队列中线程出队列时机
到目前为止,通过 addwaiter 方法构造了一个 AQS 队列,并且将线程添加到了队列的节点中。返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued() 方法中。acquireQueued() 方法可以对排队中的线程进行「获锁」操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued() 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
下面我们从「何时出队列?」和「如何出队列?」两个方向来分析一下acquireQueued() 源码。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
//记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点,若为null即刻抛npe
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node,把当前节点置为虚节点
setHead(node);
// 将哨兵节点的后继节点置为空,方便GC
p.next = null; // help GC
failed = false;
// 返回中断标识
return interrupted;
}
// 当前节点的前驱节点不是头节点【或者】当前节点的前驱节点是头节点但获取同步状态失败
//这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大概流程:
- 获取当前节点的 prev 节点;
- 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁;
- 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化head 节点。
注:setHead 方法是把当前节点置为虚节点,但并没有修改 waitStatus,因为它是一直需要用的数据。
获取同步状态成功会返回可以理解了,但是如果失败就会一直陷入到“死循环”中浪费资源吗?
很显然不是,shouldParkAfterFailedAcquire(p,node) 和 parkAndCheckInterrupt() 就会将线程获取同步状态失败的线程挂起,我们继续向下看。
从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。
对于第三个及以后的节点,if (p == head) 条件不成立,首先进行shouldParkAfterFailedAcquire(p,node) 操作,shouldParkAfterFailedAcquire 方法是靠前驱节点判断当前线程是否应该被阻塞。
它首先判断一个节点的前置节点的状态是否为 Node.SIGNAL,如果是,说明此节点已经将状态设置-如果锁释放,则应当通知它,所以它可以安全的阻塞了,返回 true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果是 SIGNAL 状态,代表此节点可以挂起,直接返回 true,准备继续调用 parkAndCheckInterrupt 方法
//因为前置节点状态为SIGNAL在适当状态 会唤醒后继节点
if (ws == Node.SIGNAL)
return true;
// ws 大于0说明是CANCELLED状态,
if (ws > 0) {
do {
//循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作
// 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
到这里你也许有个问题:
这个地方设置前驱节点为 SIGNAL 状态到底有什么作用?
保留这个问题,我们陆续揭晓。
如果前驱节点的 waitStatus 是 SIGNAL 状态,即 shouldParkAfterFailedAcquire 方法会返回 true ,程序会继续向下执行 parkAndCheckInterrupt 方法,用于将当前线程挂起,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执行
LockSupport.park(this);
// 根据 park 方法 API描述,程序在下述三种情况会继续向下执行
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执行
// 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回 true
return Thread.interrupted();
}
上述方法的流程图如下:
从上图可以看出,跳出当前循环的条件是当前置结点是头结点,且当前线程获取锁成功。
为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire 流程):
被唤醒的程序会继续执行 acquireQueued 方法里的循环,如果获取同步状态成功,则会返回 interrupted = true 的结果。
从队列中释放结点的疑虑打消了,那么又有新问题了:
shouldParkAfterFailedAcquire 中取消结点是怎么生成的呢?什么时候会把一个结点的 waitStatus 设置为 -1?
是在什么时间释放结点通知到被挂起的线程呢?
CANCELLED 状态结点生成
acquireQueued 方法中的 finally 代码:
if (failed)
cancelAcquire(node);
这段代码被执行的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为 false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不正常的情况才会执行到这里,也就是会发生异常,才会执行到此处。
通过查看 try 代码块,有两个方法会抛出异常:
- node.processor() 方法
- 自己重写的 tryAcquire() 方法
经过分析这两处地方执行抛异常的概率很小,目前没有分析出来可以执行 cancelAcquire 方法的时机。
有一种说法:在 ReentrantLock 中,不管是公平锁还是非公平锁,cancelAcquire 都不会运行。有可能道格李老爷子是保持代码统一,反正用到的时候会用到,用不到的时候也不会被执行到。
通过 cancelAcquire 方法,将 Node 的状态标记为 CANCELLED。接下来,我们逐行来分析这个方法的原理:
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
看到这个注释你可能有些乱了,其核心目的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列。
当前的流程:
获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 pred 节点和当前 Node 关联,将当前 Node 设置为 CANCELLED。
根据当前节点的位置,考虑以下三种情况:
- 当前节点是尾节点
- 当前节点是 head 的后继节点。
- 当前节点不是 head 的后继节点,也不是尾节点。
总结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析一下:
当前节点是尾节点。
当前节点是 Head 的后继节点。
当前节点不是 Head 的后继节点,也不是尾节点。
程序继续向调用栈上层返回,最终回到 AQS 的模版方法 acquire。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
你也许会有疑惑:
程序已经成功获取到同步状态并返回了,怎么会有个自我中断呢?这个问题我们留到下一篇解答。
至此,获取同步状态的过程就结束了,我们简单的用流程图说明一下整个过程。
我们上面还没有说明 SIGNAL 的作用, SIGNAL 状态信号到底是干什么用的?这就涉及到锁的释放了。由于篇幅原因我们下篇文章继续分析。