Java 并发之 AQS 详解(下)原创
文章导读
前言
在前面的文章 Java 并发之 AQS 详解(上),我们分析了 AQS 的基本原理,并且基于 ReentrantLock 分析了AQS 对于独占锁的实现。
我们遗留了一个问题:在判断当前结点是否需要挂起时,先判断前驱结点状态是否为 SIGNAL,如果是则挂起当前结点那 SIGNAL 状态信号到底是干什么用的?这就涉及到锁的释放了。
下面我们还是以 ReentrantLock 为例, 分析 AQS 如何实现独占锁的释放。
1、独占式释放同步状态
ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:
public void unlock() {
sync.release(1);
}
可以看到,本质释放锁的地方,是通过框架来完成的。
public final boolean release(int arg) {
// 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态
if (tryRelease(arg)) {
//释放成功,获取头节点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在 ReentrantLock 里面的公平锁和非公平锁的父类 Sync 定义了可重入锁的释放锁机制。
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease 方法判断如果持有的线程全部释放,则返回 true。
我们在回到 release 方法。
这里的判断条件为什么是 h != null && h.waitStatus != 0?
- h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
- h != null && waitStatus == 0表明后继节点对应的线程仍在运行中,不需要唤醒。
- h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
在看下 unparkSuccessor 方法,实际是要唤醒头节点的后继节点。
private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
有同学可能有疑问:
为什么要从后往前找第一个非 CANCELLED 的节点呢?原因如下。
之前的 addWaiter 方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
节点入队并不是原子操作,也就是说:
node.prev = pred;
compareAndSetTail(pred, node)
这两个地方可以看作是尾结点入队的原子操作,但是此时 pred.next = node; 还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,因为后继指针还没有连接起来,所以需要从后往前找。
还有一点原因,在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。
所以,同步状态至此就已经成功释放了,之前获取同步状态被挂起的线程就会被唤醒,我们继续分析中断恢复后的执行流程。
2、中断恢复后的执行流程
唤醒后,会执行 return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
再回到 acquireQueued 代码,当 parkAndCheckInterrupt 返回 true 或者 false 的时候,interrupted 的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted 返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果 acquireQueued 为 true,就会执行 selfInterrupt 方法。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这里简单分析一下:
因为 LockSupport.park 阻塞线程后,有两种可能被唤醒。
第一种情况,前节点是头节点,释放锁后,会调用 LockSupport.unpark 唤醒当前线程。整个过程没有涉及到中断,最终 acquireQueued 返回 false时,不需要调用 selfInterrupt。
第二种情况,LockSupport.park 支持响应中断请求,能够被其他线程通过interrupt() 唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在 acquireQueued 的循环里,线程会再次被阻塞。parkAndCheckInterrupt 返回的是 Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终 acquireQueued 返回 true 时,真正的中断状态已经被清除,需要调用 selfInterrupt 维持中断状态。
到这里,关于独占式获取/释放锁的流程已经闭环了,但是关于 AQS 的另外两个模版方法还没有介绍:
- lockInterruptibly:响应中断
- tryLock:支持超时
这两个方法底层实现和前面分析的获取独占式锁的逻辑是非常相近的,这里就不在赘述,感兴趣的同学可以研究下。
3、AQS 应用
ReentrantLock 是如何应用的AQS
我们知道 ReentrantLock 支持公平锁和非公平锁,那到底何为公平锁/非公平锁?
乍一看挺奇怪的,怎么里面自定义了三个同步器:其实 NonfairSync,FairSync 只是对 Sync 做了进一步划分。
生活中,排队讲求先来后到视为公平。程序中的公平性也是符合请求锁的绝对时间的,其实就是 FIFO,否则视为不公平
我们来对比一下 ReentrantLock 是如何实现公平锁和非公平锁的。
其实没什么大不了,公平锁就是判断同步队列是否还有先驱节点的存在,只有没有先驱节点才能获取锁;而非公平锁是不管这个事的,能获取到同步状态就可以。
为什么会有公平锁/非公平锁的设计?
主要有两点原因:
- 恢复挂起的线程到真正锁的获取还是有时间差的,从人类的角度来看这个时间微乎其微,但是从 CPU 的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用 CPU 的时间片,尽量减少 CPU 空闲状态时间;
- 还有一个就是在使用多线程很重要的考量点是线程切换的开销,想象一下,如果采用非公平锁,当一个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的几率就变得非常大,所以就减少了线程的开销。
相信到这里,你也就明白了,为什么 ReentrantLock 默认构造器用的是非公平锁同步器。
public ReentrantLock() {
sync = new NonfairSync();
}
说这么多,非公平锁这么好那么好,为啥还有公平锁呢?
公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 「饥饿」。
如何选择公平锁/非公平锁?
如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了,否则那就用公平锁。
- ReentrantLock 的可重入应用
- ReentrantLock 的可重入性是 AQS 很好的应用之一。在 ReentrantLock 里面,不管是公平锁还是非公平锁,都有一段逻辑。
从上面可以看到,有一个同步状态 state 来控制整体可重入的情况。state 是 volatile 修饰的,用于保证一定的可见性和有序性。
接下来看 state 这个字段主要的过程:
- state 初始化的时候为 0,表示没有任何线程持有锁。
- 当有线程持有该锁时,值就会在原来的基础上 +1,同一个线程多次获得锁是,就会多次 +1,这里就是可重入的概念。
- 解锁也是对这个字段 -1,一直到 0,此线程对锁释放。
JUC 中的应用场景
除了上边 ReentrantLock 的可重入性的应用,AQS 作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了 JUC 中的几种同步工具,大体介绍一下 AQS 的应用场景:
- ReentrantLock:使用 AQS 保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock 记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
- Semaphore:使用 AQS 同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared 会减少计数。
- CountDownLatch:使用 AQS 同步状态来表示计数。计数为 0 时,所有的 Acquire 操作(CountDownLatch 的 await 方法)才可以通过。
- ReentrantReadWriteLock:使用 AQS 同步状态中的 16 位保存写锁持有的次数,剩下的 16 位用于保存读锁的持有次数。
- ThreadPoolExecutor:Worker 利用 AQS 同步状态实现对独占线程变量的设置(tryAcquire 和 tryRelease)。