图解并发包中锁的通用实现原创
导读
这篇文章我们来聊聊Java并发包中锁的实现。因为这其中涉及到了一点数据结构和线程挂起、唤醒等处理流程,我将源码中的关键逻辑绘制成图片的格式,方便大家有一个更加直观的理解。
阅读完本篇文章,你将了解到:
-
抽象同步器AQS的实现原理
-
ReentrantLock实现原理
-
非公平锁和公平锁实现的区别
-
基于这些内容,您也可以自己进一步探索可中断锁的实现原理
-
AQS的核心是state字段以及双端等待队列
-
如何优雅的中断一个线程
1、包结构计算
以下内容是基于JDK 1.8进行分析的。
我们查看下java.util.concurrent.locks
包下面,发现主要包含如下类:
我们来构建他们的UML图:
如上图,抛开内部类,抽象类,接口,主要实现了三把锁:ReentrantLock
,StampedLock
,ReentrantReadWriteLock
。最常用的就是ReentrantLock
了,关于ReentrantLock
的详细说明以及使用案例:ReentrantLock介绍与使用 https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html 或者公众号发送 ReentrantLock。
我们可以发现ReentrantLock和ReentrantReadWriteLock顶层都是AbstractQueueSynchronizer类。我们先来介绍下AbstractQueuedSynchronizer
类。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
,简写为AQS
,抽象队列同步器。它是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效的构造出来,以下都是通过ASQ构造出来的:ReentrantLock
,Semaphore
,CountDownLatch
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
。
接下来,我们先来看看这个抽象同步队列的原理。
2.1、AQS原理
说到AQS,我们必须要先知道它是干嘛的,然后再去研究它。那我直接先讲重点了:AQS是通过队列来辅助实现线程同步的。线程并发争夺state资源,争夺失败的则进入等待队列(同步队列)并进入阻塞状态,在state资源被释放之后,从队列头唤醒被阻塞的线程节点,进行state资源的竞争。
这样势必会涉及很频繁的队列入队出队操作,以及线程的阻塞唤醒操作。这些操作恰恰是最难编写,最容易出错的,为此AQS把这些操作做了封装,以模板的方式提供出来,我们可以通过实现模板的相关方法,实现不一样的锁或者同步器。
AQS使用了模板方法,把同步队列都封装起来了,同时提供了以下五个未实现的方法,用于子类的重写:
AQS数据结构
AQS同步器数据结构
如上图,AQS中:
-
state
:所有线程通过通过CAS尝试给state设值,当state>0时表示被线程占用;同一个线程多次获取state,会叠加state的值,从而实现了可重入; -
exclusiveOwnerThread
:在独占模式下该属性会用到,当线程尝试以独占模式成功给state设值,该线程会把自己设置到exclusiveOwnerThread变量中,表明当前的state被当前线程独占了; -
等待队列(同步队列)
:等待队列中存放了所有争夺state失败的线程,是一个双向链表
结构。state被某一个线程占用之后,其他线程会进入等待队列;一旦state被释放(state=0),则释放state的线程会唤醒等待队列中的线程继续尝试cas设值state; -
head
:指向等待队列的头节点,延迟初始化,除了初始化之外,只能通过setHead方法进行修改; -
tail
:指向等待队列的队尾,延迟初始化,只能通过enq方法修改tail,该方法主要是往队列后面添加等待节点。
等待队列中的节点结构是怎样子的呢?下面我们来看看。
AQS队列节点数据结构
-
pre
:指向队列中的上一个节点; -
waitStatus
:节点的等待状态,初始化为0,表示正常同步等待: -
CANCELLED
:1 节点因超时或者被中断而取消时设置为取消状态; -
SIGNAL
:-1 指示当前节点被释放后,需要调用unpark通知后面节点,如果后面节点发生竞争导致获取锁失败,也会将当前节点设置为SIGNAL; -
CONDITION
:-2 指示该线程正在进行条件等待,条件队列中会用到; -
PROPAGATE
:-3 共享模式下释放节点时设置的状态,表示无限传播下去。 -
thread
:当前节点操作的线程; -
nextWaiter
:该字段在Condition条件等待中会用到,指向条件队列的下一个节点。或者链接到SHARED常量,表示节点正在以共享模式等待; -
next
:指向队列中的下一个节点。
如果想要了解AQS的实现,您需要先知道以下这些内容,因为源码中会大量使用:
LockSupport.park(Object blocker)和LockSupport.unpark(Thread thread)
AQS中线程的阻塞和唤醒基本上都使用这两个方法实现的。其底层都是依赖Unsafe实现的。
LockSupport是用来创建锁和其他同步类的基本线程阻塞
的原语。
此类与使用它的每个线程关联一个许可(permit: 0表示无许可,1 表示有许可),如果有许可,将立刻返回对park()的调用,并且在此过程化消耗掉它。否则,park()会导致线程进入阻塞;调用 unpark() 可使许可证可用,如果尚不可用。不过与信号量不同的是,许可证不会累加,最多只有一个。
该类中常见的两个方法两个方法:
-
park(Object blocker)
:实现线程的阻塞。除非有许可,否则出于线程调度目的将阻塞线程;如果有许可,则将许可消耗,然后线程往下继续执行; -
unpark(Thread thread)
:实现解除线程的阻塞。如果线程在park方法上被阻塞,则调用该方法将取消阻塞。否则,许可变为1,保证下一次调用park方法不会阻塞。
这两个方法底层是调用了Unsafe中的park和unpark的native方法。
具体底层实现,可以参考这里[1]
cas
我们知道,计算机中提供了cas相关指令,这是一种乐观的并发策略,需要硬件指令集的发展才能支持,实现了:操作+冲突检测的原子性。
IA64 和 X86 使用cmpxchg指令完成CAS功能。
cas 内存位置 旧预期值 新值
CAS存在ABA问题,可以使用版本号进行控制,保证其正确性。
JDK中的CAS,相关类:
Unsafe
里面的compareAndSwapInt()
以及compareAndSwapLong()
等几个方法包装提供。只有启动类加载器加载的class才能访问他,或者通过反射获取。
详细说明:一文带你彻底理解同步和锁的本质(干货) #2.1.5、基于硬件指令
interrupt
相关阅读:如何优雅的中断线程 https://www.itzhai.com/cpj/how-to-interrupt-threads-gracefully.html,或者公众号发送 线程中断。
为了分析AQS的实现原理,我们先挑一个方法来分析。
2.2、AQS中的一般处理流程
为了弄清楚AQS中是如何进行队列同步的,我们先从一个简单的独占加锁方法说起。
2.2.1、public final void acquire(int arg)
这个方法是使用独占模式获取锁,忽略中断。通过至少调用一次tryAcquire成功返回来实现。否则,线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquire直到成功。
我们先看一下这个方法的入口代码:
1public final void acquire(int arg) {
2 if (!tryAcquire(arg) && // 尝试获取锁,这里是一个在AQS中未实现的方法,具体由子类实现
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取不到锁,则 1.添加到等待队列 2.不断循环等待重试
4 selfInterrupt();
5}
tryAcquire
一开始,会尝试调用AQS中未实现的方法tryAcquire()
尝试获取锁,获取成功则表示获取锁了,该方法的实现一般通过CAS进行设置state尝试获取锁:
不同的锁可以有不同的tryAcquire()
实现,所以,你可以看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式。
ReentrantLock公平锁的实现代码在获取锁之前多了一个判断:!hasQueuedPredecessors(),这个是判断如果当前线程节点之前没有其他节点了,那么我们才可以尝试获取锁,这就是公平锁的体现。
addWaiter
获取锁失败之后,则会进入这一步,这里会尝试把线程节点追加到等待队列后面,是通过CAS进行追加的,追加失败的情况下,会循环重试,直至追加成功为止。如果追加的时候,发现head节点还不存在,则先初始化一个head节点,然后追加上去:
1private Node addWaiter(Node mode) {
2 // 将当期线程构造成Node节点
3 Node node = new Node(Thread.currentThread(), mode);
4 // Try the fast path of enq; backup to full enq on failure
5 Node pred = tail;
6 if (pred != null) {
7 // 将原来尾节点设置为新节点的上一个节点
8 node.prev = pred;
9 // 尝试用新节点取代原来的尾节点
10 if (compareAndSetTail(pred, node)) {
11 // 取代成功,则将原来尾指针的下一个节点指向新节点
12 pred.next = node;
13 return node;
14 }
15 }
16 // 如果当前尾指针为空,则调用enq方法
17 enq(node);
18 return node;
19}
acquireQueued
加入等待队列之后,会执行该方法,不断循环地判断当前线程节点是否在head后面一位,如果是则调用tryAcquire()获取锁,如果获取成功,则把线程节点作为Node head,并把原Node head的next设置为空,断开原来的Node head。注意这个Node head只是占位作用,每次处理的都是Node head的下一个节点:
1final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 boolean interrupted = false;
5 for (;;) {
6 // 获取该节点的上一个节点,判断是否头节点,如果是则尝试获取锁
7 final Node p = node.predecessor();
8 if (p == head && tryAcquire(arg)) {
9 // 获取锁成功,把当前节点变为头节点
10 setHead(node);
11 p.next = null; // help GC
12 failed = false;
13 return interrupted;
14 }
15 // 判断是否需要阻塞线程,该方法中会把取消状态的节点移除掉,并且把当前节点的前一个节点设置为SIGNAL
16 if (shouldParkAfterFailedAcquire(p, node) &&
17 parkAndCheckInterrupt())
18 interrupted = true;
19 }
20 } finally {
21 if (failed)
22 cancelAcquire(node);
23 }
24}
如果当前节点的pre不是head,或者争抢失败,则会将前面节点的状态设置为SIGNAL。
如果前面的节点状态大于0,表示节点被取消,这个时候会把该节点从队列中移除掉。
下图为尝试CAS争抢锁,但失败了,然后把head节点状态设置为SIGNAL:
然后再会循环一次尝试获取锁,如果获取失败了,就调用LockSupport.park(this)
挂起线程。
那么时候才会触发唤起线程呢?这个时候我们得先看看释放锁是怎么做的了。
大家看AQS的源码的时候,可以发现这里的线程阻塞与唤醒基本上是用一个循环+LockSupport.park+LockSupport.unpark实现的,大家知道为什么要这样做?
相关阅读:如何优雅的挂起线程,或者公众号发送:线程挂起
2.2.2、public final boolean release(int arg)
入口代码如下:
1public final boolean release(int arg) {
2 if (tryRelease(arg)) { // 尝试释放锁
3 Node h = head;
4 if (h != null && h.waitStatus != 0) // 如果头节点waitStatus不为0,则唤醒后续线程节点继续处理
5 unparkSuccessor(h);
6 return true;
7 }
8 return false;
9}
tryRelease()具体由子类实现。一般处理流程是让state减1。
如果释放锁成功,并且头节点waitStatus!=0,那么会调用unparkSuccessor()
通知唤醒后续的线程节点进行处理。
注意:在遍历队列查找唤醒下一个节点的过程中,如果发现下一个节点状态是CANCELLED
那么就会忽略这个节点,然后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操作。
唤醒之后,节点对应的线程2又从acquireQueued()
方法的阻塞处醒来继续参与争抢锁。并且争抢成功了,那么会把head节点的下一个节点设置为null,让自己所处的节点变为head节点:
这样一个AQS独占式、非中断的抢占锁的流程就结束了。
2.2.3、完整流程
最后我们再以另一个维度的流程来演示下这个过程。
首先有4个线程争抢锁,线程1,成功了,其他三个失败了,分别依次入等待队列:
线程2、线程3依次入队列:
现在突然发生了点事情,假设线程3用的是带有超时时间的tryLock,超过了等待时间,线程3状态变为取消状态了,这个时候,线程4追加到等待队列中后,发现前一个节点的状态是1取消状态,那么会执行操作把线程3节点从队列中移除掉:
最后,线程1释放了锁,然后把head节点ws设置为0,并且找到了离head最靠近的一个waitStatus<=0的线程并唤醒,然后参与竞争获取锁:
最终,线程2获取到了锁,然后把自己变为了Head节点,并取代了原来的Head节点:
接着就一直这样循环,我就不再画图了,聪明的你应该对此了如指掌了。
3、使用AQS实现的锁
好了,有了这个AQS,我们就可以很快速的构造属于自己的锁了。
我们来分别构造一个独占不可中断公平锁和非公平锁吧?不必了,其实ReentrantLock正是这样一个锁:
发现里面分别有一个公平锁和非公平锁的实现。相信经过本文介绍,你可以很快看懂他们的源码,并知道实现原理了。
除此之外,ReentrantLock同时提供了以下几个常用的API:
-
lock()
: 调用该方法会使锁计数器加1,如果共享资源最初是空闲的,则将锁定并授予线程; -
unlock()
: 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源; -
tryLock()
: 如果资源没有被任何其他线程占用,那么该方法返回true,并且锁计数器加1。如果资源不是空闲的,则该方法返回false。这个时候线程不会阻塞,而是直接退出返回结果; -
lockInterruptible()
: 该方法使得资源空闲时允许该线程在获取资源时被其他线程中断。也就是说:如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回,不会继续等待获取锁;
感兴趣可以看我的这篇文章进一步了解:https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html,或者公众号发送:ReentrantLock。
3.1、组合大于继承原则
我们可以看到,ReentrantLock是通过委托AQS的子类FairSync
和NonfairSync
来调用AQS的方法,而不是直接扩展AQS,这样做可以避免ASQ中的方法污染了锁的API,破坏锁接口的简洁性。
结语
好了,我们今天就讲到这里了,最后,我留下两个课堂作业给大家思考下:
1、ReentrantLock的公平锁是怎么实现的?如何做到公平的?
2、ReentrantLock的非公平锁是怎么实现的?为什么说它是非公平的?
3、ReentrantLock的可中断锁是如何实现的?interrupt()函数执行原理是什么?
4、ReentrantLock的可超时的锁是如何实现的?
相信聪明的你很快可以找到答案。
本文为arthinking基于相关技术资料和官方文档撰写而成,确保内容的准确性,如果你发现了有何错漏之处,烦请高抬贵手帮忙指正,万分感激。大家可以关注我的博客:itzhai.com
获取更多文章;