性能文章>浅析AbstractQueuedSynchronizer>

浅析AbstractQueuedSynchronizer原创

3月前
224800

最近算是闲来无事,再者就是,很早之前就和几个同行探讨过,国内很多一些资料并没有能够将一些东西讲清楚,还有就是丰富一下自己的阅历。既然对他人的资料不满,不如自己动手写一份试试。本来要鸽了的,但是既然有大佬想看我写出来,而且我最近也对AQS稍微深入了解了一番,那么就选择AQS —— AbstractQueuedSynchronizer作为我的**作,尝试写出一篇可以让别人看得懂的技术文。

AbstractQueuedSynchronizer介绍

AbstractQueuedSynchronizer(后文简称为AQS)是JUC(Java.util.concurrent)包下的locks包的一个类,也算是国内面试中出现频率比较高的一个知识点。对于AQS,从名字就可以知道它是一个抽象类,在我的理解中,抽象类就是用来提供一些规定的共同的动作的实现。该怎么理解呢?抽象,说直白点可能是,对于某个东西,我不知道它具体是什么样的,但是我知道它有一些什么特征和行为。比如说我知道这个世界上有鸡,鸡有很多个品种,但是它们都会啄米啄虫子,并且通常来说都是两个翅膀两个爪子等。这就是对一个抽象类(鸡)的特征(两个翅膀两个爪子)和行为(啄米啄虫子)的描述。

回到AQS,作为一个抽象类,我们可以看到它类名的剩余组成部分:QueuedSynchronizer,拆开来看,Queued为排队的意思,Synchronizer则是同步器的意思,那么它的类名给我们提供的意思就是:抽象的,可排队的同步器。如果我们去看源码,会发现AQS继承自一个AbstractOwnableSynchronizer,Ownable意为可被持有的,所以AQS是可以被获取的。

AbstractQueuedSycnrhonizer组成

随便下载一份JDK1.8+的源代码,找到AQS,就可以通过IntelliJ Idea工具看到它的Structure也就是结构,个人认为比较关键或者说重要的组成包括一个内部类Node和几个字段,包括

  • Node head:AQS内部队列的头节点(作用类似于一个哨兵节点,可以说没有什么实际的意义)

  • Node tail:AQS内部队列的尾节点

  • int state:同步状态的个数,可能这么说会很奇怪,可以简单的理解为可被获取的锁的个数。

  • static long spinForTimeoutThreshold:纳秒数,在调用包含超时时间的API时会用到,超过这个纳秒数的超时时间,那么会被挂起(后面会讲到,涉及API为LockSupport.park),否则会自旋等待而不是挂起。

关于这里的state,是AQS的一个核心变量。AQS是一个状态依赖的类,state就是它所管理的状态,并提供了getState、setState和compareAndSetState等protected方法来进行管理,state可以表示任何的意义,通常依赖于它的子类的实现。比如说,在ReentrantLock中,state表示当前锁持有者已经重复获取锁的次数;在Semaphore中,表示可以被获取的“许可证”的数量。

顺带一提,这是Java大牛Doug Lea的作品,所以通过这些变量名,我们可以很直观的知道它们各自的含义,我也给出了相关的注释。在head和tail的描述中,提到了AQS内部队列,毕竟它的名字就带了个Queued,而它内部队列的实现是CLH队列(Craig,Landin and Hagersten lock queue)的一个变种。数据结构层面来说,就是一种双向链表。这个内部队列是AQS的很重要的基石部分,下面就来看看这个Node内部类是如何来实现这个队列的。

AbstractQueuedSyncronizer.Node

Node内部类也是包含几个字段,包括

  • int waitStatus:直译为等待状态

  • Node next

  • Node prev

  • Thread thread

  • Node nextWaiter:直译为下一个等待的节点

可以看到,也是比较简洁的结构,重点则在于waitStatus这个字段,这个字段用来标识当前Node的状态,Node类中定义了多个常量,用来规定状态的各种值情况,包括

  • CANCELLED:1

  • SIGNAL:-1

  • CONDITION:-2

  • PROPAGATE:-3

其实还包括一个初始状态和过渡状态,值为0,不过源码中没有给出,只是我个人认为其实可以标注出来。当Node被初始化之后,默认的waitStatus就为0。并且在一些状态扭转的过程中,也会先CAS到0然后再CAS到最终状态。所以0在我看来是一个初始状态和中间状态。

还有一个需要注意的地方在于Node的构造方法,有两种方式:

  • Node(Thread thread, Node mode) // used by addWaiter

  • Node(Thread thread, int waitStatus) // used by Condition

可以看到这是两种不同情况下的API,我们暂时只关注第一种即可,第二种涉及到等待队列,暂时不讨论。

属于Node的部分其实也不需要讲太多,只需要知道,它封装了一个线程,并且用waitStatus来标注了被封装的线程的状态(其实这么说是不太准确的,但是为了不造成更多的误解,这么理解就行)。剩下的,就是用熟悉的next、prev来实现一个链表的数据结构;还有一点就是,其实AQS的Node可以组成两种(不是两个)队列,一种是同步队列,一种是等待队列。

同步队列是不涉及Condition的、简单的锁获取时的排队队列;等待队列是关联到具体的Condition的,一个AQS绑定几个Condition,理论上就会有多少个等待队列。

AbstractQueuedSynchronizer关键API

在AQS中,有几个默认抛出UnsupportOperationException的API,也就是要留给它的子类自行实现的方法,包括:

  • tryAcquire & tryRelease

  • tryAcquireShared & tryReleaseShared

  • isHeldExclusively

一般来说,对于独占式锁的操作,涉及到tryAcquire、tryRelease和isHeldExclusively这三个API;对于共享式锁的操作则只涉及tryAcquireShared和tryReleaseShared。在AQS的设计中,本身AQS提供了不带有try前缀的、并且提供了默认实现的API,也就是:

  • acquire & release

  • acquireShared & releaseShared

AQS的acquire方法实现如下:

 public final void acquire(int arg) {
   if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
     selfInterrupt();
  }
 }

可以看到,这也是设计模式中的模版方法模式的体现,其他方法也是类似的思路,就不展开了。

并行和并发

在真正看到源码之前,再次介绍一下这两个名词的概念。个人认为,并行(paralleli**)和并发(concurrency)从定义上再到表现上都是不同的,但是由于很多八股文之类的,可能并没有说的很明白,所以在这里加上一部分我自己的理解。

  • 并行:在某一个时间点,比如说你看到这行字的这一瞬间,有多个任务在同时进行。这就是并行。

  • 并发:在某一个时间段,比如说从你点开这篇文章,到你读到这行字的这一时间段内,你可能在边看这篇文章,同时你可能还在电脑上挂着微信、QQ、钉钉等,这一时间段内,你有时候会专注于这篇文章,有时候又会转头过去聊一下天。这就是并发。

被标注上颜色的,我认为就是各自的关键词,也算的上是用来区分它们的特征。在这里只是给出我自己的理解,不一定对,仅供参考即可。

AbstractQueuedSynchronizer的运行逻辑

这部分应该算是一个比较重要的地方了。很多的八股文什么的也都会涉猎到这部分,这里我只讨论几个问题:

  1. 线程获取锁

    1. 成功

    2. 失败

  2. 线程释放锁

这也就是AQS的核心问题,我认为也没有必要再去探究更多,将这两个问题讨论清楚,也基本上可以搞清楚AQS了,并且,这里也只讨论独占锁的情况,对于共享锁,在了解了独占锁之后一般也可以做到触类旁通,不再赘述。


 

线程获取锁

通常,AQS本身的acquire就是一个获取锁的操作,源码在上文。也copy一下来方便看

 public final void acquire(int arg) {
   if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
     selfInterrupt();
  }
 }

我们可以看到,是非常简单的逻辑,但是其中涉及到多个方法,将调用链路展开看,就是:

tryAcquire【尝试获取锁】 --> addWaiter【添加等待节点】 --> acquireQueued【在队列中获取】 --> selfInterrupt【自我中断】

我把它们各自的作用都标注好了,可能不够准确,但是应该也大差不差。在深入了解之前,首先看到下面这张图,看起来可能会更加的舒服一点:

这里有个地方可以提一下,就是:我们看到,acquire方法包含了addWaiter方法,addWaiter是用来往队列中添加等待节点的。那么,假设当前有两个线程A和B,假如他们都调用了基于AQS实现的独占锁的方法,最后都落到了这个acquire这里,并且假设A比B稍微早一点调用,那么,由于CPU调度的不确定性,可能会出现B在队列中的次序比A要靠近头节点的情况。也就是说,在某些极端的瞬时并发调用的时候光是通过acquire顺序来判断各个线程所属的Node在等待队列的排列顺序很可能是不准确。因为addWaiter和acquire这两个操作之间并不是一个操作,所以这里会有一个并发问题,可以参考i++这样的问题。所以有些八股文中所说的,线程会按照它们调用lock(其实大部分情况下都是落到acquire上)方法的顺序在AQS内排队,这句话我认为是不够严谨的,当然,也是我认为的,不一定准确,更多的是想让大家自己去试验一下并得出一个自己的结论。

下面回到代码中。

tryAcquire就不讨论了,我们可以简单的理解为就是去获取锁,成功了,那么这个方法就直接返回。失败了,就会走到addWaiter中,打开源码可以看到addWaiter方法,这里贴出代码如下:

 private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
         pred.next = node;
         return node;
        }
    }
     enq(node);
     return node;
 }

也是比较简单的逻辑,将线程封装成一个Node,并表明它是独占式的锁——Node.EXCLUSIVE,然后就是插入到尾节点上。这里需要注意,如果获取到的tail为null,会进到enq方法,enq方法包含了初始化AQS内部队列的逻辑,代码如下:

 private Node enq(final Node node) {
     for (;;) {
         Node t = tail;
         if (t == null) { // Must initialize
             if (compareAndSetHead(new Node()))
                 tail = head;
        } else {
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
            }
        }
    }
 }

可以看到,如果tail为null,那么说明当前AQS的等待队列为空,那么就需要调用enq方法进行队列的初始化,enq方法在不涉及Condition的情况下,只会在addWaiter中被调用,所以可以直观的认为enq其实就是初始化队列的一个方法。

在addWaiter方法跳出之后,会走入到acquireQueued方法,这算是一个重点,我们可以先来看它源码的注释如下:

Acquires in exclusive uninterruptible mode for thread already in queue. Used by condition wait methods as well as acquire.

我们暂时只关注前面一句即可:已经处在队列中的线程,在不可被中断的情况下获取独占锁。

看到代码,如下:

 final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (; ; ) {
             final AbstractQueuedSynchronizer.Node p = node.predecessor();
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
            }
             if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                 interrupted = true;
        }
    } finally {
         if (failed)
         cancelAcquire(node);
    }
 }

看到第一个if的条件,判断前驱节点是否为head,如果为head的话,继续tryAcquire,这也是八股文中常见的“前驱节点为首节点时才可以获取锁”,如果获取成功就进行一系列操作,直接返回即可。

可以关注到后面的第二个if,看到shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt方法,通过名字就可以知道,第一个方法是判断:在获取锁失败后是否需要挂起;第二个方法则是:挂起并判断中断。

我们进入到shouldParkAfterFailedAcquire方法去,代码如下:

     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         int ws = pred.waitStatus;
         if (ws == Node.SIGNAL)
             return true;
         if (ws > 0) {
             do {
                 node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
             pred.next = node;
        } else {
             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
         return false;
    }

把一些注释去掉后,非常的简洁直观,目的就是:确保自己的前驱节点是正常的节点(不为空且没有被取消,也就是waitStatus不为CANCELLED),并且前驱节点的waitStatus必须为SIGNAL,为SIGNAL则表示当前节点的后继节点处于BLOCK状态,也就是阻塞,所以当前节点在释放或者被取消的时候需要唤醒后继节点。

回到这个方法,就是确保自己能够被前驱节点唤醒了,才能安心的让出自己的CPU,让自己休息。一般来说,这个方法会被调用两次,因为waitStatus的初始值一般为0,第一次会将waitStatus通过CAS设置为SIGNAL,返回false,然后第二次就会返回true。

返回true之后,就会进入到parkAndCheckInterrupt方法,代码如下:

 private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);
     return Thread.interrupted();
 }

很简单,通过LockSupport将自己挂起,被(前驱节点)唤醒之后,返回当前线程是否被中断的标志。

总结来看,流程不算很长,并且也将一些比较关键的概念串联了起来。总结如下图:

到此,AQS的acquire逻辑就捋完了,顺便提一嘴就是AQS的内部队列,感觉很多八股文并没有将它讲的很清晰,其实是一个很简单的数据结构,主要对于head节点,head的thread字段永远都是为null的。并且,如果同步器(也就是AQS及其实现类)没有出现竞争,也就是,比如说,两个线程thread-A和thread-B分别调用lock,触发到acquire方法,没有出现竞争,那么head是不会被初始化的(这就是所谓的延迟初始化)。head只有在第一次出现竞争,导致线程进入等待的时候会被初始化。并且看到AQS的setHead方法,如下

 private void setHead(Node node) {
   this.head = node;
   node.thread = null;
   node.prev = null;
 }

它会将node的thread和prev都置为空,所以head的thread字段永远为null,所以,有些八股文中说的head代表获取了锁的线程,这种说法是不正确的。并且,在release方法被调用的时候,会出现head的waitStatus=0的情况(涉及到release方法源码,后续再解析),如果后续有线程在排队,那么就只是简单的唤醒后继节点,并不会改变后继节点的waitStatus,也就是说:

  • 如果后继节点为尾节点,那么此时head会被设置为 {waitStatus = 0; thread = null}

  • 如果后继节点不为尾节点,那么此时head会被设置为{waitStatus = -1; thread = null}

同理,对于tail节点的生命周期也是和head类似。最开始都是null,当队列被初始化的时候,tail为当前队列最末尾的一个在等待的线程的Node,一般,tail的waitStatus恒定为0(不考虑Condition队列的情况下)。

一些题外话

在末尾,对于AQS公平锁和非公平锁的情况,我觉得也可以再提一下,这里不会很深入的去探讨。参考ReentrantLock的源码即可。可以看到acquire方法(这个方法在文章内被多次提及,应该可以看得出它的重要性),Doug Lea对它的逻辑编排,首先是尝试tryAcquire,再去考虑排队的问题。如果忘了的话可以翻上去看看我贴出来的源码。所以我认为,AQS其实是一个非公平锁,我这么下定论的依据是它会将tryAcquire放在第一步,一般来说,tryAcquire就是直接决定一个线程是否可以获得锁的方法,所以,我会认为acquire方法是一个非公平的获取锁的方法

但是,当我们看到ReentrantLock的时候,会看到它的FairSync的lock方法是直接去调用了acquire方法的,而关键的tryAcquire方法在FairSync中也被重写了。所以,虽然这里的lock是直接调用acquire方法,但是如果我们去看到它的源码,就可以看到,实际上在FairSync重写的tryAcquire方法中,对能否获取锁进行了严格的限制,所以虽然它直接调用了acquire方法,但是它的实现上仍然是公平锁。

写这么一段,是为了让大家知道,有一些“一般来说”的东西,并不一定和我们想象中的一样,而是需要亲身去看一下才会有更好的理解,就像这篇文章,只是为了抛砖引玉,让大家有去看源码的**,自行去了解更好。

顺便提一下就是,公平锁和非公平锁的唯一区别就在于,它们在尝试获取锁的时候:

  • 公平锁:如果前面有线程获取了锁,那么去排队。

  • 非公平锁:直接去尝试获取锁,不管有没有别的线程正在持有锁,失败之后才会去排队,并且需要提一个小误区,这个误区来自我本人。我之前认为非公平锁会在持有锁的线程释放锁的时候被唤醒,其实,不管公平锁还是非公平锁,只要进了等待队列,那么就会按照等待队列的顺序被唤醒,不会被超前唤醒(除非中断什么的)。

也就是说,只有在第一次获取锁的时候(对应到Java代码,意思是线程第一次调用lock()方法的时候)会不会去判断有没有线程正在持有锁,我认为这就是它们的唯一区别。


 

线程释放锁

释放这里就比较简单了,看到release方法的源码如下:

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}

其中,tryRelease是自定义的方法,不赘述。看到它如果成功release了之后有一个对head的判断,这里就对应到了上文说到的,如果没有竞争,head就为null,也就没有线程在等待,直接返回即可。后续对head的waitStatus进行判断是为了感知后续有没有在等待的线程,因为,按照上面的逻辑,在一个节点A被添加到队列的时候,并假设它是最后一个节点,它会将前驱节点B的waitStatus通过CAS设置成-1,A本身的waitStatus则为0,那么,如果它的前驱节点B被设置为了head,会保留B的waitStatus也就是-1,就需要唤醒A。如果后续A获取了锁,进入到release,那么A后续没有节点入队,它的waitStatus仍旧为0,所以不需要唤醒后继节点。

可以看到,释放锁涉及的逻辑只有一个unparkSuccessor,通过名字也可以知道这个方法是唤醒入参节点的后继节点。源码如下:

    private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

可以看到,也是比较简单的逻辑,注意到入参这里,正常情况下传入的都是head节点,第一个if将入参node的waitStatus修改为0,对应的就是上文说到的head会短暂变为0的情况。然后就是找到下一个正常等待的节点,将它的线程唤醒即可,而不是直接将锁的持有权给到下一个节点。

总结

上文讲解的是我认为的AQS比较核心的API,通过对这两个API的详细逻辑的讲解,我认为可以让读者对AQS的最基础的运行情况有个了解,骨架定好了,剩下的更多都是一些细枝末节的东西,后续如果我有去深入了解,也会再尝试出一篇文章。

 

 

点赞收藏
分类:标签:
monstaxl
请先登录,感受更多精彩内容
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步
0
0