Java并发编程解析 | 基于JDK源码解析Java领域中的并发锁,我们需要特别关注哪些内容?原创
写在开头
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。<br />主要原因是,对于多线程实现实现并发,一直以来,多线程都存在2个问题:
- 线程之间内存共享,需要通过加锁进行控制,但是加锁会导致性能下降,同时复杂的加锁机制也会增加编程编码难度
- 过多线程造成线程之间的上下文切换,导致效率低下
因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”<br />简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。
关健术语
<br />本文用到的一些关键词语以及常用术语,主要如下:
- 并发(Concurrent): 在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。
- 并行(Parallel): 当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行。
- 信号量(Semaphore): 是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用,也是作系统用来解决并发中的互斥和同步问题的一种方法。
- 信号量机制(Semaphores): 用来解决同步/互斥的问题的,它是1965年,荷兰学者 Dijkstra提出了一种卓有成效的实现进程互斥与同步的方法。
- 管程(Monitor) : 一般是指管理共享变量以及对共享变量的操作过程,让它们支持并发的一种机制。
- 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。即就是同一时刻只允许一个线程访问共享资源的问题。
- 同步(Synchronization):两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。即就是线程之间如何通信、协作的问题。
- 对象池(Object Pool): 指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的, 一般指保存实例对象的容器。
基本概述
在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。
在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:
- 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。即就是同一时刻只允许一个线程访问共享资源的问题。
- 同步(Synchronization):两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。即就是线程之间如何通信、协作的问题。
针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。<br />虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。<br />为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。<br />在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:
- 基于Lock接口实现的锁:JDK1.5版本提供的ReentrantLock类
- 基于ReadWriteLock接口实现的锁:JDK1.5版本提供的ReentrantReadWriteLock类
- 基于AQS基础同步器实现的锁:JDK1.5版本提供的并发相关的同步器Semaphore,CyclicBarrier以及CountDownLatch等
- 基于自定义API操作实现的锁:JDK1.8版本中提供的StampedLock类
从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
一.AQS基础同步器基本理论
在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。
<br />一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。<br />在Java领域中,JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:
- 同步状态机制(Synchronization Status):主要用于实现锁(Lock)机制,是指同步状态,其要求对于状态的更新必须原子性的
- 等待队列(Wait Queue):主要用于存放等待线程获取到的锁资源,并且把线程维护到一个Node(节点)里面和维护一个非阻塞的CHL Node FIFO(先进先出)队列,主要是采用自旋锁+CAS操作来保证节点插入和移除的原子性操作。
- 条件队列(Condition Queue):用于实现锁的条件机制,一般主要是指替换“等待-通知”工作机制,主要是通过ConditionObject对象实现Condition接口提供的方法实现。
- 独占模式(Exclusive Mode):主要用于实现独占锁,主要是基于静态内部类Node的常量标志EXCLUSIVE来标识该节点是独占模式
- 共享模式(Shared Mode):主要用于实现共享锁,主要是基于静态内部类Node的常量标志SHARED来标识该节点是共享模式
我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:<br />
- 条件变量(Conditional Variable): 利用线程间共享的变量进行同步的一种工作机制
- 共享变量((Shared Variable)):一般指对象实体对象的成员变量和属性
- 阻塞队列(Blocking Queue):共享变量(Shared Variable)及其对共享变量的操作统一封装
- 等待队列(Wait Queue):每个条件变量都对应有一个等待队列(Wait Queue),内部需要实现入队操作(Enqueue)和出队操作(Dequeue)方法
- 变量状态描述机(Synchronization Status):描述条件变量和共享变量之间状态变化,又可以称其为同步状态
- 工作模式(Operation Mode): 线程资源具有排他性,因此定义独占模式和共享模式两种工作模式
综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。
二. JDK显式锁统一概念模型
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
综合Java领域中的并发锁的各种实现与应用分析来看,一把锁或者一种锁,基本上都会包含以下几个方面:
- 锁的同步器工作机制:主要是考虑共享模式还是独享模式,是否支持超时机制,以及是否支持超时机制?
- 锁的同步器工作模式:主要是基于AQS基础同步器封装内部同步器,是否考虑公平/非公平模式?
- 锁的状态变量机制: 主要锁的状态设置,是否共享状态变量?
- 锁的队列封装定义:主要是指等待队列和条件队列,是否需要条件队列或者等待队列定义?
- 锁的底层实现操作: 主要是指底层CL锁和CAS操作,是否需要考虑自旋锁或者CAS操作实例对象方法?
- 锁的组合实现新锁: 主要是基于独占锁和共享锁,是否考虑对应API自定义操作实现?
综上所述,大致可以根据上述这些方向,我们便可以清楚🉐️知道Java领域中各种锁实现的基本理论时和实现思想。
三.ReentrantLock(可重入锁)的设计与实现
在Java领域中,ReentrantLock(可重入锁)是针对于Java多线程并发控制中对一个线程可以多次对某个锁进行加锁操作,主要是基于内置的AQS基础抽象队列同步器实现的一种并发控制工具类。
<br />一般来说,对于同一个线程是否可以重复占有同一个锁对象的角度来分,大致主要可以分为可重入锁与不可重入锁。其中:
- 可重入锁:一个线程可以多次抢占同一个锁,也就意味着能够支持一个线程对资源的重复加锁,或者说,一个线程可以多次进入同一个锁所同步的临界区代码块。
- 不可重入锁:一个线程只能抢占一次同一个锁,也就意味着在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能等待,只有拥有锁的线程释放了锁后,其他的线程才能够获取锁。
ReentrantLock是JDK中显式锁一个主要基于Lock接口API实现的基础实现类,拥有与内置锁(synchronized)相同的并发性和内存语义,同时提供了限时抢占、可中断抢占等一些高级锁特性。<br />除此之外,ReentrantLock基于内置的AQS基础抽象队列同步器实现,在线程参与锁资源竞争比较激烈的场景下,能表现出比内置锁较佳的性能。<br />而且,ReentrantLock是一种独占锁,在独占模式下只能逐一使用锁,也就是说,任意时刻最多只会有一个线程持有锁的控制权。
1. 设计思想
ReentrantLock类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括同步器工作模式,获取锁方法,释放锁方法以及定义Condition队列方法等4个核心要素。其中:
- 实现Lock接口 :主要基于Lock接口API实现对应方法,拥有与内置锁(synchronized)相同的并发性和内存语义,用于支持和解决解决互斥问题。
- 同步器工作模式:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步器抽象类,然后基于这个抽象类分别实现了公平同步器和非公平同步器,用来指定和描述同步器工作模式是公平模式还是非公平模式。
- 公平/非公平模式:主要描述的是多个线程在同时获取锁时是否按照先到先得的顺序获取锁,如果是则为公平模式,否则为非公平模式。
- 获取锁方法:主要定义了一个lock()方法来获取锁,表示假如锁已经被其他线程占有或持有,其当前获取锁的线程则进入等待状态。
- 释放锁方法:主要定义了一个unlock()方法来释放锁,表示假如锁已经被其他线程放弃或释放,其当前获取锁的线程则获得该锁。
- 定义Condition队列操作方法: 主要是基于Condition接口来定义一个方法实现锁的条件机制,用于支持线程的阻塞和唤醒功能即就是解决同步问题,也就是我们说的线程间的通信方式。
- 定义等待队列操作方法: 主要是依据条件队列来时进行对应的操作,间接适配AQS基础同步器中对于等待队列的功能,保证获取锁的顺序的公平性
2. 基本实现
在ReentrantLock类的JDK1.8版本中,对于ReentrantLock的基本实现如下:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699 L;
/**
* ReentrantLock锁-定义支持同步器实现
*/
private final Sync sync;
/**
* ReentrantLock锁-基于AQS定义支持同步器实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860 L;
/**
* ReentrantLock锁-定义支持同步器Sync获取锁方法
*/
abstract void lock();
//......其他方法代码
}
/**
* ReentrantLock锁-构造同步器默认工作模式(默认非公平模式)
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* ReentrantLock锁-构造同步器指定工作模式(可选公平/非公平模式)
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* ReentrantLock锁-获取锁(普通模式)
*/
public void lock() {
sync.lock();
}
/**
* ReentrantLock锁-释放锁
*/
public void unlock() {
sync.release(1);
}
/**
* ReentrantLock锁-创建锁的条件机制
*/
public Condition newCondition() {
return sync.newCondition();
}
//......其他方法代码
}
- 内部同步器:基于AQS基础同步器封装和定义了一个静态内部Sync抽象类,其中抽象了一个内置锁lock()方法
- 同步器工作模式:提供了 2个构造方法,其中无参数构造方法表示的是默认的工作模式,有参数构造方法主要依据参数来实现指定的工作模式
- 获取锁: 主要是提供了lock()方法,调用的静态内部Sync抽象类内置锁lock()方法,而本质上是AQS同步器中的acquire()方法
- 释放锁: 主要是提供了unlock()方法,而本质上是调用的AQS同步器中的release()方法
- 创建条件队列: 主要是基于Condition接口定义了newCondition() 方法,调用的静态内部Sync抽象类ewCondition()方法,而本质上是调用的AQS同步器中的ConditionObject中的newCondition()方法
2.1 基于AQS同步器封装静态内部Sync抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* ReentrantLock锁-内部同步器Sync的内置加锁方法
*/
abstract void lock();
/**
* ReentrantLock锁-内部同步器Sync的非公平获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* ReentrantLock锁-内部同步器Sync的尝试释放
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* ReentrantLock锁-内部同步器Sync的检查线程是否独占
*/
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
/**
* ReentrantLock锁-内部同步器Sync的条件机制
*/
final ConditionObject newCondition() {
return new ConditionObject();
}
/**
* ReentrantLock锁-内部同步器Sync的判断锁持有者
*/
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
/**
* ReentrantLock锁-内部同步器Sync的独占状态
*/
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
/**
* ReentrantLock锁-内部同步器Sync的是否被锁
*/
final boolean isLocked() {
return getState() != 0;
}
/**
* ReentrantLock锁-内部同步器Sync的流化处理对象
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
- Sync类:FairSync公平同步器和NonfairSync非公平同步器的抽象父类。
- 校验isHeldExclusively()方法: ReentrantLock锁是属于独占模式,需要当前锁持有线程与当前线程是否一致
- nonfairTryAcquire()方法: 一般主要用于非公平模式获取锁,其核心是compareAndSetState方法和setExclusiveOwnerThread方法
- tryRelease()方法: 其公平/非公平模式都是通过ryRelease()来释放锁操作
- newCondition()方法: 基于AQS同步器的ConditionObject对象封装实现,提供给 ReentrantLock类使用
- 私有readObject()方法:对于输入的对象进行流化处理
特别需要注意的是,我们需要重点关注nonfairTryAcquire()方法和tryRelease()方法,其中:
- 获取非公平锁 nonfairTryAcquire()方法:主要是用于获取AQS的状态变量status,其默认取值范围是0和1,其中,0表示未被加锁,1表示已经被加锁
- 如果状态变量status=0,使用compareAndSetState方法进行CAS原子修改操作,把状态变量修改为1,并且通过setExclusiveOwnerThread设置当前线程为锁的持有线程
- 如果状态变量status=1,表示当前线程为锁的持有线程,正在进入锁重入操作,状态变量累加1,超过重入次数时,会抛出throw new Error(“Maximum lock count exceeded”)
- 释放锁tryRelease()方法:主要是检查当前线程是否为锁持有线程,随后AQS同步器状态变量减1,如果不是 throw new IllegalMonitorStateException()
- 如果状态变量status=0,表示锁已经释放成功,通过setExclusiveOwnerThread设置锁的持有线程为null,也就是置空锁的持有线程
- 如果状态变量status !=0, 需要状态变量递减1即可,直到锁已经释放成功
2.2 基于Sync抽象类封装FairSync公平同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* ReentrantLock锁-公平模式-获取锁
*/
final void lock() {
acquire(1);
}
/**
* ReentrantLock锁-公平模式-尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 实现方式: 主要基于AQS封装的内部静态抽象Sync同步类实现,使用的AQS的独占模式。
- 主要方法: 主要提供了lock()和tryAcquire()方法,其严格意义上来说,仅仅只是实现了tryAcquire()方法,但是最关键的使用hasQueuedPredecessors来保证了锁的公平性。
- 锁获取方式: 主要是采用完全通过队列来实现实现公平机制,即就是检查是否存在等待队列,如果队列之中已经存在其他线程,直接放弃操作。
2.3 基于Sync抽象类封装NonfairSync非公平同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* ReentrantLock锁-非公平模式-获取锁
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* ReentrantLock锁-非公平模式-尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- 实现方式: 主要基于AQS封装的内部静态抽象Sync同步类实现,使用的AQS的独占模式。
- 主要方法: 主要提供了lock()和tryAcquire()方法,其严格意义上来说,仅仅只是实现了tryAcquire()方法,直接调用了Sync同步类的nonfairTryAcquire()方法。
- 锁获取方式: 主要是采用闯入策略来打破锁的公平,也就是一般准备获取锁的线程会先尝试获取锁,失败之后才进入队列中。
3. 具体实现
在ReentrantLock类的JDK1.8版本中,对于ReentrantLock的具体实现如下:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699 L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* ReentrantLock锁-基于AQS定义支持同步器实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860 L;
/**
* ReentrantLock锁-定义支持同步器Sync获取锁方法
*/
abstract void lock();
//......其他方法代码
}
/**
* ReentrantLock锁-构造同步器默认工作模式(默认非公平模式)
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* ReentrantLock锁-构造同步器指定工作模式(可选公平/非公平模式)
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* ReentrantLock锁-获取锁(普通模式)
*/
public void lock() {
sync.lock();
}
/**
* ReentrantLock锁-释放锁
*/
public void unlock() {
sync.release(1);
}
/**
* ReentrantLock锁-创建锁的条件机制
*/
public Condition newCondition() {
return sync.newCondition();
}
/**
* ReentrantLock锁-获取锁(支持可中断机制)
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* ReentrantLock锁-尝试获取锁(普通模式)
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* ReentrantLock锁-尝试获取锁(支持超时)
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/**
* ReentrantLock锁-统计当前线程所持有数量
*/
public int getHoldCount() {
return sync.getHoldCount();
}
/**
* ReentrantLock锁-检测当前线程是否独占
*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
/**
* ReentrantLock锁-检测是否被加锁
*/
public boolean isLocked() {
return sync.isLocked();
}
/**
* ReentrantLock锁-检测是否公平模式
*/
public final boolean isFair() {
return sync instanceof FairSync;
}
/**
* ReentrantLock锁-获取当前锁持有线程
*/
protected Thread getOwner() {
return sync.getOwner();
}
/**
* ReentrantLock锁-检测轮询线程是否存在队列中
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* ReentrantLock锁-检测线程是否存在队列中
*/
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
/**
* ReentrantLock锁-获取队列数量
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* ReentrantLock锁-获取队列中的所有线程
*/
protected Collection < Thread > getQueuedThreads() {
return sync.getQueuedThreads();
}
/**
* ReentrantLock锁-检测存在条件队列是否入队状态
*/
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject) condition);
}
/**
* ReentrantLock锁-获取等待队列的长度
*/
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject) condition);
}
/**
* ReentrantLock锁-获取等待队列的线程对象
*/
protected Collection < Thread > getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject) condition);
}
}
- 获取锁的方法:主要提供了一般模式lock()方法,支持可中断机制lockInterruptibly()方法,无参数tryLock()方法以及有参数的支持超时机制的tryLock(long timeout, TimeUnit unit)方法
- 释放锁的方法:主要是unlock()方法,间接调用是内部同步器中的release()方法
- 条件队列操作:主要提供了获取队列中的线程对象getQueuedThreads(),检测队列入队hasWaiters(Condition condition) 方法,以及
- 等待队列操作:主要提供了获取队列中的线程对象getWaitingThreads(Condition condition),检测队列入队hasQueuedThread(Thread thread) 方法,以及获取队列长度getQueueLength()方法和getWaitingThreads(Condition condition)方法
- 其他检测判断:主要有判断是否公平模式isFair()方法,是否当前线程独占isHeldByCurrentThread()方法,以及是否加锁 isLocked()等
需要注意的是,在JDK1.8版本之后,对于ReentrantLock的实现有些细微的变化,感兴趣的可自行参考相关版本的源码进行对比分析。<br />综上所述,从一定意义上讲,ReentrantLock是一种可重入的独占(互斥)锁,属于AQS基础抽象队列同步器中独占模式孵化的产物,支持公平模式与非公平模式,默认采用非公平模式。
四.ReentrantReadWriteLock(读写锁)的设计与实现
在Java领域中,ReentrantReadWriteLock(读写锁)是针对于Java多线程并发控制中引入一个共享锁定义读操作与独占锁定义读操作等场景共同组合构成一把锁来提高并发,主要是基于内置的AQS基础抽象队列同步器实现的一种并发控制工具类。
<br />通过ReentrantReadWriteLock类能获取读锁和写锁,它的读锁是可以多线程共享的共享锁,而它的写锁是排他锁,在被占时不允许其他线程再抢占操作。
1. 设计思想
<br />一般来说,在一些特殊的场景中,比如对于数据的读和写操作,为提高并发性能,总会引入共享锁和独享锁来共同组成一把锁,通常情况下,我们把这类锁成为读写锁(ReadWriteLock) 。<br />简单来说,就是主要考虑读和写操作,读操作不会修改数据,可以利用多个线程进行读操作,一般采用共享锁实现;而写操作会改变数据本身,只能允许一个线程进行操作,因此采用独享锁实现。<br />读写锁(ReadWriteLock) 最大的一个特点就是在内部维护一对锁,一把读锁(ReadLock) ,一把写锁(WriteLock) 。其中,对于线程持有的情况来说,简单可以总结为“读共享,写独占”。
1.1 读写锁的基本理论
虽然读写锁(ReadWriteLock) 之间是有关系的:同一时刻不允许读锁和写锁同时被抢占,二者之间是互斥的。<br />假设现在有N个线程,主要从T(1),T(2),…,一直到T(N)个线程,在读写锁的操作情况如下,其中:
- 多读模式(多读共享):T(N) 个线程可以同时把持并获取读锁,假设T(1)线程成功获取并持有读锁,T(2)线程和后续的T(N)个线程依然可以成功获取读锁,即使T(1)线程没有释放持有的读锁。
- 读写模式(读写互斥):假设T(1)线程成功获取并持有读锁,T(2)线程和后续的T(N)个线程便不能成功获取且持有写锁,除非T(1)线程已经释放持有的读锁。
- 独写模式(单写独占):假设T(1)线程成功获取并持有写锁,T(2)线程和后续的T(N)个线程便不能成功获取且持有读锁和写锁,只能等待等待T(1)线程释放完持有的写锁,才能继续往下执行。
从一定意义上讲,根据读写锁操作的情况的性质分析,获取读锁和写锁的条件可以大致总结为:
- 获取读锁的条件:当前任何线程没有成功获取且已经持有写锁的情况,才可能获取并持有读锁。
- 获取写锁的条件:当前任何线程没有成功获取且已经持有写锁和读锁的情况,才可能获取并持有写锁。
但是在某些情况下,可能存在某个线程已经获取并持有读锁,希望能够获取写锁,并且在已经释放读锁时,通常情况下我们称之为读写锁的升级。<br />当然,有升级就会有降级,与之对应的就是读写锁的降级,主要描述的是某个线程已经获取并持有写锁,希望能够获取读锁,并且已经释放写锁。<br />一般来说,对于读写锁的升级与降级,我们一般需要注意的以下两个问题,其中:
- 读写锁的升级:指的是读锁升级为写锁的情况,需要满足某个线程必须是唯一拥有读锁的线程的条件,否则无法升级。
- 读写锁的降级:指的是写锁降级为读锁的情况,没有什么条件限制,写锁是独占锁,其持有线程是唯一的且不会存在读锁持有线程的情况,可以直接平滑升级读锁。
1.2 读写锁的实现思想
ReentrantReadWriteLock最早是在JDK1.5版本中提供的,从设计思想上来看,主要包括同步器的工作模式,读锁和写锁等3个核心要素。其中:
- 实现ReadWriteLock接口 :主要基于ReadWriteLock接口API实现对应方法,主要是实现writeLock()方法和readLock() 方法,其中writeLock()方法表示获取写锁,readLock() 方法表示获取读锁。
- 同步器的工作模式:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步器抽象类,然后基于这个抽象类分别实现了公平同步器和非公平同步器,用来指定和描述同步器工作模式是公平模式还是非公平模式。
- 公平/非公平模式:主要描述的是多个线程在同时获取锁时是否按照先到先得的顺序获取锁,如果是则为公平模式,否则为非公平模式。
- 内置两个静态公有内部类:定义了读锁和写锁静态公有内部类,并且都支持公平/非公平模式,本质都是基于AQS基础抽象队列同步器实现。
- 维护共享状态变量: 主要是基于一个AQS基础抽象队列同步器来实现读锁和写锁,要求共用一个共享状态变量。
2. 基本实现
在ReentrantReadWriteLock类在JDK1.8版本中,对于ReentrantReadWriteLock的基本实现如下:
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** ReentrantReadWriteLock锁-内部ReadLock类 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** ReentrantReadWriteLock锁-内部WriteLock类 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** ReentrantReadWriteLock锁-内部同步器 */
final Sync sync;
/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/** ReentrantReadWriteLock锁-共用状态变量封装-begin*/
/** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
static final int SHARED_SHIFT = 16;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
private transient ThreadLocalHoldCounter readHolds;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
private transient HoldCounter cachedHoldCounter;
/** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
private transient Thread firstReader = null;
/** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
private transient int firstReaderHoldCount;
/** ReentrantReadWriteLock锁-共用状态变量封装-end*/
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
/** ReentrantReadWriteLock锁-读锁标记*/
abstract boolean readerShouldBlock();
/** ReentrantReadWriteLock锁-读锁标记*/
abstract boolean writerShouldBlock();
//... 其他代码
}
/** ReentrantReadWriteLock锁-无参数构造(默认非公平模式) */
public ReentrantReadWriteLock() {
this(false);
}
/** ReentrantReadWriteLock锁-有参数构造(可选公平/非公平模式) */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** ReentrantReadWriteLock锁-获取写锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** ReentrantReadWriteLock锁-获取读锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
/** ReentrantReadWriteLock锁-实例化Unsafe支持 */
private static final sun.misc.Unsafe UNSAFE;
/** ReentrantReadWriteLock锁-线程偏移量 */
private static final long TID_OFFSET;
/** ReentrantReadWriteLock锁-获取线程变量 */
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
/** ReentrantReadWriteLock锁-反射机制实例化Unsafe */
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
}
- 内部同步器:基于AQS基础同步器封装和定义了一个静态内部Sync抽象类
- 同步器工作模式:提供了 2个构造方法,其中无参数构造方法表示的是默认的工作模式,有参数构造方法主要依据参数来实现指定的工作模式
- 公平/非公平模式:主要是基于Sync抽象类封装NonfairSync非公平同步器和封装NonfairSync非公平同步器来实现。
- 内置两个内部类:主要是实现了ReadLock类和WriteLock类,其中ReadLock类对应着读锁,WriteLock类对应着写锁。
2.1 基于AQS同步器封装静态内部Sync抽象类
/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/** ReentrantReadWriteLock锁-共用状态变量封装-begin */
/** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
static final int SHARED_SHIFT = 16;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
private transient ThreadLocalHoldCounter readHolds;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
private transient HoldCounter cachedHoldCounter;
/** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
private transient Thread firstReader = null;
/** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
private transient int firstReaderHoldCount;
/** ReentrantReadWriteLock锁-共用状态变量封装-end*/
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
/** ReentrantReadWriteLock锁-读锁标记*/
abstract boolean readerShouldBlock();
/** ReentrantReadWriteLock锁-读锁标记*/
abstract boolean writerShouldBlock();
/** ReentrantReadWriteLock锁-独占模式获取读锁*/
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
/** ReentrantReadWriteLock锁-独占模式释放锁*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
/** ReentrantReadWriteLock锁-共享模式释放锁*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/** ReentrantReadWriteLock锁-共享模式获取锁*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
/** ReentrantReadWriteLock锁-共享模式获取锁*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
/** ReentrantReadWriteLock锁-判断是否独占模式*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Methods relayed to outer class
/** ReentrantReadWriteLock锁-定义条件变量*/
final ConditionObject newCondition() {
return new ConditionObject();
}
/** ReentrantReadWriteLock锁-获取当前锁的持有者*/
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
/** ReentrantReadWriteLock锁-获取读锁次数统计*/
final int getReadLockCount() {
return sharedCount(getState());
}
/** ReentrantReadWriteLock锁-判断是否是写锁*/
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
/** ReentrantReadWriteLock锁-获取写锁持有次数统计*/
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
/** ReentrantReadWriteLock锁-获取读锁次持有数统计*/
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
/** ReentrantReadWriteLock锁-获取读锁*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
/** ReentrantReadWriteLock锁-获取写锁*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
/** ReentrantReadWriteLock锁-流处理*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
/** ReentrantReadWriteLock锁-获取状态*/
final int getCount() { return getState(); }
}
- 实现方式: 主要是基于AQS基础同步器来实现,其中封装抽象了readerShouldBlock()方法和writerShouldBlock()方法,用于标记当前请求的线程是获取什么类型的锁。其中:
- readerShouldBlock()方法:依据标记返回的是true,标记线程获取的是ReadLock锁
- writerShouldBlock()方法:依据标记返回的是true,标记线程获取的是WriteLock锁
- 维护共用状态变量: 对于共享状态变量的实现,主要是在内部的同步器中Sync类中定义HoldCounter类和ThreadLocalHoldCounter类实现的,其中定义了一堆常量,统计读写锁状态值的sharedCount()方法和exclusiveCount()方法。
- 读写锁的主要方法: 主要是提供了tryReadLock()方法和tryWriteLock()方法,其中:
- tryReadLock()方法:获取读锁方法,核心处理是自旋+compareAndSetState()方法来处理
- tryWriteLock()方法:获取写锁方法,核心处理是通过compareAndSetState()方法来处理
- 读写锁的获取方式:基于AQS基础同步器来实现对于共享模式和独享模式两种情况,都提供了对应的方法。其中:
- tryAcquire()方法:独享模式获取锁方法,这里主要针对WriteLock锁,核心处理是通过AQS基础同步器中compareAndSetState()方法来处理,实现状态变量的操作
- tryAcquireShared() 方法:共享模式获取锁方法,这里主要对应ReadLock锁,核心处理是通过AQS基础同步器中compareAndSetState()方法来处理,实现状态变量的操作
- 读写锁的释放方式:基于AQS基础同步器来实现对于共享模式和独享模式两种情况,都提供了对应的方法。其中:
- tryRelease(int releases) 方法:独享模式释放锁方法
- tryReleaseShared(int unused) 方法:共享模式释放锁方法,核心处理是自旋+compareAndSetState()方法来处理
- 其他方法:主要还提供了一些比较常规的方法,其中:
- getCount() 方法:主要是基于AQS基础同步器来获取状态变量
- getOwner()方法:主要是用于获取当前锁的持有者,一般是线程对象,根据_exclusiveCount_(getState()) == 0来判断,条件成立就默认为null; 否则,通过AQS基础同步器中的getExclusiveOwnerThread()方法来获取。
- getReadLockCount() 方法:用于获取某个线程对于ReadLock锁的数量,主要是统计次数
- isWriteLocked() 方法:用于判断线程是否已经获取并持有WriteLock锁
- getWriteHoldCount()方法:用于获取线程对于WriteLock锁的持有情况,主要是统计次数
- getReadHoldCount() 方法:用于获取线程对于WriteLock锁的持有情况,主要是统计次数
- isHeldExclusively() 方法:用于判断是否独占模式
- unmatchedUnlockException()方法:封装一个异常处理信息,主要是指定IllegalMonitorStateException
2.2 基于Sync抽象类封装共享状态变量
/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/** ReentrantReadWriteLock锁-共用状态变量封装-begin*/
/** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
static final int SHARED_SHIFT = 16;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
private transient ThreadLocalHoldCounter readHolds;
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
private transient HoldCounter cachedHoldCounter;
/** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
private transient Thread firstReader = null;
/** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
private transient int firstReaderHoldCount;
/** ReentrantReadWriteLock锁-共用状态变量封装-end*/
/** ReentrantReadWriteLock锁-共用状态变量封装-构造方法*/
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
/** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** ReentrantReadWriteLock锁-共用状态变量封装-写锁的状态码值*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/** ReentrantReadWriteLock锁-共用状态变量-统计计数器 */
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/** ReentrantReadWriteLock锁-共用状态变量-本地存储统计副本 */
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
//... 其他代码
}
- 实现方式: 主要是在内部的同步器中Sync类中定义HoldCounter类和ThreadLocalHoldCounter类实现的。其中:
- HoldCounter类:主要是定义了一个计数器count和一个线程编号tid变量,其中计数器count默认值为0,而线程编号tid通过_getThreadId_(Thread.currentThread())方法来获取。
- ThreadLocalHoldCounter类:基于ThreadLocal和HoldCounter来提供了initialValue()方法,主要是实例话化HoldCounter类。
- 共用状态常量:主要是在内部的同步器中Sync类中定义了相关常量,其中:
- SHARED_SHIFT: 主要用于标记位移的位数,默认采用 整型16位
- SHARED_UNIT:表示读锁加锁操作时每次对应的状态值大小,将1左移动16位正好对应高16位的1.
- MAX_COUNT:表示读锁能执行加锁操作的最大次数,一般为16个二进制的1
- EXCLUSIVE_MASK:写锁的掩码,一般为16个二进制的1
- 主要方法:主要提供了统计读写锁状态值的sharedCount()方法和exclusiveCount()方法,其中:
- sharedCount()方法:获取读锁的状态码值,根据目标参数(targetParam)左移16位即可得到
- exclusiveCount()方法:获取写锁的状态码值,根据目标参数(targetParam)同写锁的掩码做逻辑与(&)运算便可得到。
一般来说,AQS基础同步器的共享状态变量是整型的32位,要基于一个AQS基础同步器实现读写锁的共享一个共享变量。<br />其中,最公平的方式设计方式就是读锁与写锁各自占用16位,就意味着读锁占用的是高16位,写锁占用的是低16位的。
但是,在获取读写锁的状态值的时候,还会涉及一些额外的计算,这样的设计方式可能会需要用到位移和逻辑与操作等。
2.3 基于Sync抽象类封装FairSync公平同步器
/** ReentrantReadWriteLock锁-基于Sync抽象类封装FairSync公平同步器 */
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
/** ReentrantReadWriteLock锁- 实现writerShouldBlock方法*/
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
/** ReentrantReadWriteLock锁- 实现readerShouldBlock方法*/
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
- 实现方式:基于Sync抽象类封装FairSync公平同步器,表示支持公平模式
- 主要方法:主要实现实现writerShouldBlock()方法和readerShouldBlock()方法,其中:
- writerShouldBlock()方法:通过hasQueuedPredecessors()实现
- readerShouldBlock()方法:通过apparentlyFirstQueuedIsExclusive()实现
2.4 基于Sync抽象类封装NonfairSync非公平同步器
/** ReentrantReadWriteLock锁-基于Sync抽象类封装FairSync公平同步器 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
/** ReentrantReadWriteLock锁- 实现writerShouldBlock方法*/
final boolean writerShouldBlock() {
return false; // writers can always barge
}
/** ReentrantReadWriteLock锁- 实现readerShouldBlock方法*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
- 实现方式:基于Sync抽象类封装NonfairSync非公平同步器,表示支持非公平模式
- 主要方法:主要实现实现writerShouldBlock()方法和readerShouldBlock()方法,其中:
- writerShouldBlock()方法:默认返回false
- readerShouldBlock()方法:通过apparentlyFirstQueuedIsExclusive()实现
2.5 基于Lock接口实现ReadLock读锁内部类
/** ReentrantReadWriteLock锁-基于Lock接口实现ReadLock读锁内部类*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-同步器 */
private final Sync sync;
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-内部构造方法*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-获取锁方法(默认共享模式)*/
public void lock() {
sync.acquireShared(1);
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-获取锁方法(支持中断机制)*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-尝试获取锁(一般模式)*/
public boolean tryLock() {
return sync.tryReadLock();
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-尝试获取锁(支持超时机制)*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-释放锁*/
public void unlock() {
sync.releaseShared(1);
}
/** ReentrantReadWriteLock锁-ReadLock读锁内部类-条件变量*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
- 实现方式:主要是基于Lock接口来实现的ReadLock锁,同时通过构造方法指定了一个内部同步器Sync,其对应的方法都是基于AQS基础同步器的共享模式来实现的。
- 获取锁方式:主要提供了4个方法来实现锁的获取,其中:
- 无参数的lock()方法: 获取锁的一般模式,主要是基于AQS基础同步器中的acquireShared(int arg)方法来实现,其核心处理逻辑是doAcquireShared(int arg)方法
- 无参数的lockInterruptibly()方法:获取可中断锁的模式,主要是基于AQS基础同步器中的acquireSharedInterruptibly(int arg)方法来实现,其核心处理逻辑是doAcquireSharedInterruptibly(int arg)方法
- 无参数的tryLock()方法:尝试获取ReadLock锁,主要是基于AQS基础同步器中的tryReadLock()方法来实现,其核心处理逻辑是自旋+compareAndSetState()方法的加持CAS操作的。
- 有参数的ryLock()方法:尝试获取ReadLock锁,支持超时机制,主要是基于AQS基础同步器中的tryAcquireSharedNanos(int arg, long nanosTimeout)方法来实现,其核心处理逻辑是在doAcquireSharedNanos(int arg, long nanosTimeout)方法,主要是自旋+shouldParkAfterFailedAcquire()方法的加持CAS操作的。
- 释放锁方式:主要提供了一个unlock()方法来实现ReadLock 的释放,其中本质是基于AQS基础同步器中的releaseShared(int arg) 方法,其中核心处理逻辑是doReleaseShared()的方法,其核心处理是自旋+compareAndSetWaitStatus()方法来加持CAS操作的。
2.6 基于Lock接口实现WriteLock写锁内部类
/** ReentrantReadWriteLock锁-基于Lock接口实现WriteLock写锁内部类*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-同步器*/
private final Sync sync;
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-内部构造方法*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(独占模式)*/
public void lock() {
sync.acquire(1);
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(可中断)*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(一般模式)*/
public boolean tryLock( ) {
return sync.tryWriteLock();
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(支持超时机制)*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-释放锁*/
public void unlock() {
sync.release(1);
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-条件变量*/
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-是否独占判断*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
/** ReentrantReadWriteLock锁-WriteLock写锁内部类-统计数量*/
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
- 实现方式:主要是基于Lock接口来实现的WriteLock锁,同时通过构造方法指定了一个内部同步器Sync,其对应的方法都是基于AQS基础同步器的独占模式来实现的。
- 获取锁式:主要提供了4个方法来实现锁的获取,其中:
- 无参数的lock()方法: 获取WriteLock锁的一般模式,主要是基于AQS基础同步器中的acquire(int arg)方法来实现,其核心处理逻辑是acquireQueued(final Node node, int arg)方法
- 无参数的lockInterruptibly()方法:获取WriteLock锁可中断锁的模式,主要是基于AQS基础同步器中的acquireInterruptibly(int arg)方法来实现,其核心处理逻辑是doAcquireInterruptibly(int arg)方法
- 无参数的tryLock()方法:尝试获取WriteLock锁,主要是基于AQS基础同步器中的tryReadLock()方法来实现,其核心处理逻辑是自旋+compareAndSetState()方法的加持CAS操作的。
- 有参数的ryLock()方法:尝试获取WriteLock锁,支持超时机制,主要是基于AQS基础同步器中的tryAcquireNanos(int arg, long nanosTimeout)方法来实现,其核心处理逻辑是在doAcquireNanos(int arg, long nanosTimeout)方法,主要是自旋+shouldParkAfterFailedAcquire()方法的加持CAS操作的。
- 释放锁方式:主要提供了一个unlock()方法来实现ReadLock 的释放,其中本质是基于AQS基础同步器中unparkSuccessor(Node node)方法,主要是通过_compareAndSetWaitStatus_()方法来加持CAS操作的。
3. 具体实现
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** ReentrantReadWriteLock锁-内部ReadLock类 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** ReentrantReadWriteLock锁-内部WriteLock类 */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** ReentrantReadWriteLock锁-内部同步器 */
final Sync sync;
/** ReentrantReadWriteLock锁-实例化Unsafe支持 */
private static final sun.misc.Unsafe UNSAFE;
/** ReentrantReadWriteLock锁-线程偏移量 */
private static final long TID_OFFSET;
/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
//... 其他代码
}
/** ReentrantReadWriteLock锁-无参数构造(默认非公平模式) */
public ReentrantReadWriteLock() {
this(false);
}
/** ReentrantReadWriteLock锁-有参数构造(可选公平/非公平模式) */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** ReentrantReadWriteLock锁-获取写锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** ReentrantReadWriteLock锁-获取读锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
/** ReentrantReadWriteLock锁-获取线程变量 */
public final boolean isFair() {
return sync instanceof FairSync;
}
/** ReentrantReadWriteLock锁-获取线程变量 */
protected Thread getOwner() {
return sync.getOwner();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public int getReadLockCount() {
return sync.getReadLockCount();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public final int getQueueLength() {
return sync.getQueueLength();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
/** ReentrantReadWriteLock锁-获取线程变量 */
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
/** ReentrantReadWriteLock锁-获取线程变量 */
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
/** ReentrantReadWriteLock锁-获取线程变量 */
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
/** ReentrantReadWriteLock锁-反射机制实例化Unsafe */
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
}
- 读写锁同步器:主要提供了2个构造方法来实现读写锁的管理,其中:
- 无参数构造方法:默认非公平模式,主要是通过this关键字来指定的
- 有参数构造方法:可选公平/非公平模式,依据指定传入公平标记fair来实例化NonfairSync非公平同步器和FairSync公平同步器,其中,当fair=true时,是公平平模式;否则,当fair=false时为非公平模式。同时,实例化ReadLock和WriteLock对象。
- 读锁主要方法:对于读锁的操作,一般我们只需要关注readLock()方法和类似getReadXX() 方法,其中:
- readLock()方法:主要用于获取和实现读锁ReadLock
- getReadHoldCount() 方法:主要用于统计某个线程对于读锁的持有情况
- getReadLockCount() 方法:主要用于统计某个线程对于读锁的获取的次数
- 写锁主要方法:对于读锁的操作,一般我们只需要关注readLock()方法和类似包含xWriteXX() 方法,其中:
- writeLock() 方法:主要用于获取和实现写锁WriteLock
- getWriteHoldCount()方法:主要用于统计当前线程对于写锁WriteLock的持有情况
- isWriteLocked()方法:主要用于判断某个线程对于写锁WriteLock的是否加锁
- isWriteLockedByCurrentThread() 方法:主要用于判断当前线程对于写锁WriteLock的是否加锁
- 条件队列操作方法:还提供了一系列的对于条件变量队列操作方法,其中:
- getQueuedWriterThreads() 方法:主要用于获取线程等待获取写锁WriteLock的情况
- getQueuedReaderThreads() 方法:主要用于获取线程等待获取读锁ReadLock的情况
- getQueuedThreads()方法:主要用于获取线程等待获取读锁ReadLock和写锁WriteLock的情况
- getQueueLength() 方法:主要用于获取线程等待获取读锁ReadLock和写锁WriteLock的个数
- 等待队列操作方法: 还提供了一系列的对于等待队列操作方法,其中:
- getWaitingThreads() 方法:主要依据Condition来用于获取等待队列中所有的线程的对象
- getWaitQueueLength() 方法:主要依据Condition来用于获取等待队列中所有的线程对于写锁WriteLock的相关的个数
- 其他方法:除此之外,还提供了一些队列操作的常规方法,其中:
- hasQueuedThread() 方法:主要依据单个Thread对象用于获取线程是否有获取读锁ReadLock和写锁WriteLock的情况
- hasQueuedThreads() 方法:主要用于获取多个线程是否有获取读锁ReadLock和写锁WriteLock的情况
- hasWaiters()方法:主要依据Condition来用于判断等待队列中所有的线程对于写锁WriteLock的相关情况
- isFair() 方法:用于判断是否公平模式
- getThreadId()方法:获取线程编号ID,主要是通过指定_UNSAFE_.getLongVolatile(thread, TID_OFFSET)实现。
<br />综上所述,ReentrantReadWriteLock锁是基于AQS基础同步器的共享模式和独享模式共同孵化的产物,支持公平/非公平模式,其中的ReadLock和WriteLock是基于同一个AQS基础同步器来实现,维护了共用状态变量机制。
五.StampedLock(印戳锁)的设计与实现
在Java领域中,StampedLock(印戳锁)是针对于Java多线程并发控制中引入一个共享锁定义读操作与独占锁定义读操作等场景共同组合构成一把锁来提高并发,主要是基于自定义API操作实现的一种并发控制工具类。
1. 设计思想
<br />StampedLock(印戳锁)是对ReentrantReadWriteLock读写锁的一 种改进,主要的改进为:在没有写只有读的场景下,StampedLock支持 不用加读锁而是直接进行读操作,最大程度提升读的效率,只有在发 生过写操作之后,再加读锁才能进行读操作。<br />一般来说,StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。
1.1 印戳锁的基本理论
虽然基于AQS基础同步器实现了各种锁,但是由于采用的自旋锁+CAS操作方式会导致如下两个问题:
- CAS恶性空自旋会浪费大量的CPU资源
- 在SMP架构的CPU上会导致“总线风暴”问题
解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的 方案有两种:分散操作热点和使用队列削峰。<br />基于这个基础,在JDK1.8版本中,基于使用队列削峰的方式,自定义API操作,提供了StampedLock(印戳锁)的实现。<br />简单来说,StampedLock(印戳锁)提供了三种锁的实现模式,其中:
- 悲观读锁:与ReadWriteLock的读锁类似,多个线程可以同 时获取悲观读锁,悲观读锁是一个共享锁。
- 乐观读锁:相当于直接操作数据,不加任何锁,连读锁都不 要。
- 写锁:与ReadWriteLock的写锁类似,写锁和悲观读锁是互 斥的。虽然写锁与乐观读锁不会互斥,但是在数据被更新之后,之前 通过乐观读锁获得的数据已经变成了脏数据。
1.1 印戳锁的实现思想
StampedLock(印戳锁)与其他显式锁不同的是,主要是是最早在JDK1.8版本中提供的,从设计思想上来看,主要包括共享状态变量机制,内置的等待数据队列,读锁视图,写锁视图以及读写锁视图等5个核心要素。其中:
- 共享状态变量机制:主要是在内部封装一些静态私有的常量,用于描述各个模式之间的状态描述等。
- 内置的等待数据队列:主要是自定义实现一个基于CLH锁的等待队列
- 读锁视图:基于Lock接口实现一个对应读锁的视图
- 写锁视图:基于Lock接口实现一个对应写锁的视图
- 读写锁视图:基于ReadWriteLock接口实现一个包含读锁和写锁的视图
2. 基本实现
在StampedLock(印戳锁)类的JDK1.8版本中,对于StampedLock的基本实现如下:
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
private static final long serialVersionUID = -6001602636862214147L;
/** StampedLock锁-自旋控制的最大允许核心线程数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** StampedLock锁-等待队列自旋控制的最大自旋阈值 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** StampedLock锁-等待队列头节点自旋控制的最大自旋阈值 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** StampedLock锁-等待队列头节点自旋控制的最大自旋阈值 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** StampedLock锁-进入阻塞之前的最大重试次数 */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
//... 其他锁资源的状态常量
/** StampedLock锁-CLH队列的头部(head)节点 */
private transient volatile WNode whead;
/** StampedLock锁-CLH队列的尾部(tail)节点 */
private transient volatile WNode wtail;
/** StampedLock锁-读锁视图*/
transient ReadLockView readLockView;
/** StampedLock锁-写锁视图*/
transient WriteLockView writeLockView;
/** StampedLock锁-读写锁视图 */
transient ReadWriteLockView readWriteLockView;
/** StampedLock锁-锁的最原始的状态初始值 */
private static final long ORIGIN = WBIT << 1;
/** StampedLock锁-各种锁的同步状态变量 */
private transient volatile long state;
/** StampedLock锁-读锁的溢出的拓展标记 */
private transient int readerOverflow;
/** StampedLock锁-构造方法 */
public StampedLock() {
state = ORIGIN;
}
/** StampedLock锁-实例化ReadLock方法 */
public Lock asReadLock() {
ReadLockView v;
return ((v = readLockView) != null ? v :
(readLockView = new ReadLockView()));
}
/** StampedLock锁-实例化WriteLock方法 */
public Lock asWriteLock() {
WriteLockView v;
return ((v = writeLockView) != null ? v :
(writeLockView = new WriteLockView()));
}
/** StampedLock锁-实例化ReadWriteLock方法 */
public ReadWriteLock asReadWriteLock() {
ReadWriteLockView v;
return ((v = readWriteLockView) != null ? v :
(readWriteLockView = new ReadWriteLockView()));
}
/** StampedLock锁-获取ReadLock方法 */
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
/** StampedLock锁-获取WriteLock方法 */
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
//... 其他代码
}
2.1 共享状态变量机制
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-自旋控制的最大允许核心线程数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** StampedLock锁-等待队列自旋控制的最大自旋阈值 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** StampedLock锁-等待队列头节点自旋控制的最大自旋阈值 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** StampedLock锁-等待队列头节点自旋控制的最大自旋阈值 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** StampedLock锁-进入阻塞之前的最大重试次数 */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
// Values for lock state and stamp operations
/** StampedLock锁-读锁移动的位数 */
private static final long RUNIT = 1L;
/** StampedLock锁-写锁移动的位数 */
private static final long WBIT = 1L << LG_READERS;
/** StampedLock锁-读锁移动的位数 */
private static final long RBITS = WBIT - 1L;
/** StampedLock锁-读锁移动的位数 */
private static final long RFULL = RBITS - 1L;
/** StampedLock锁-读写锁移动的位数 */
private static final long ABITS = RBITS | WBIT;
/** StampedLock锁-读写锁移动的位数 */
private static final long SBITS = ~RBITS;
// Special value from cancelled acquire methods so caller can throw IE
/** StampedLock锁-线程对象中断标识 */
private static final long INTERRUPTED = 1L;
// Values for node status; order matters
/** StampedLock锁-最早在JDK1.8中实现的 */
private static final int WAITING = -1;
/** StampedLock锁-最早在JDK1.8中实现的 */
private static final int CANCELLED = 1;
// Modes for nodes (int not boolean to allow arithmetic)
/** StampedLock锁-用于表示在队列之中是读模式 */
private static final int RMODE = 0;
/** StampedLock锁-用于表示在队列之中是写模式 */
private static final int WMODE = 1;
//... 其他代码
// Unsafe mechanics
/** StampedLock锁-实例化Unsafe对象 */
private static final sun.misc.Unsafe U;
/** StampedLock锁-状态 */
private static final long STATE;
/** StampedLock锁-头部节点 */
private static final long WHEAD;
/** StampedLock锁-尾部节点 */
private static final long WTAIL;
/** StampedLock锁-后继节点 */
private static final long WNEXT;
/** StampedLock锁-节点状态 */
private static final long WSTATUS;
/** StampedLock锁-节点链表 */
private static final long WCOWAIT;
/** StampedLock锁-中断标识 */
private static final long PARKBLOCKER;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = StampedLock.class;
Class<?> wk = WNode.class;
STATE = U.objectFieldOffset
(k.getDeclaredField("state"));
WHEAD = U.objectFieldOffset
(k.getDeclaredField("whead"));
WTAIL = U.objectFieldOffset
(k.getDeclaredField("wtail"));
WSTATUS = U.objectFieldOffset
(wk.getDeclaredField("status"));
WNEXT = U.objectFieldOffset
(wk.getDeclaredField("next"));
WCOWAIT = U.objectFieldOffset
(wk.getDeclaredField("cowait"));
Class<?> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
} catch (Exception e) {
throw new Error(e);
}
}
}
对于StampedLock锁中对于各种资源的标记,其封装了一系列的常量,主要可以分为以下几个方面,其中:
- 核心资源常量标识:是对线程操作资源的提供的常量封装,其中:
- NCPU:自旋控制的核心线程数量,主要通过Runtime.getRuntime().availableProcessors()获取设置。
- SPINS:等待队列自旋控制的最大自旋阈值,主要通过 (_NCPU _> 1) ? 1 << 6 : 0获取设置
- HEAD_SPINS: 等待队列头节点自旋控制的自旋阈值,主要通过 (_NCPU _> 1) ? 1 << 10 : 0获取设置
- MAX_HEAD_SPINS:等待队列头节点自旋控制的最大自旋阈值,主要通过(_NCPU _> 1) ? 1 << 16 : 0获取设置
- OVERFLOW_YIELD_RATE:线程让步操作等待的自旋阈值,默认值为7
- LG_READERS:读锁溢出的最大阈值,默认值为7
- _INTERRUPTED:线程中断标识,_默认值为1L
- 锁状态值设置标识:
- ORIGIN:锁状态的初始值,默认值为WBIT << 1,如果分配失败默认设置为0
- 锁状态的操作标识:
- RUNIT:读锁移动的位数,默认值为 1
- WBIT:写锁移动的位数,默认值为1L << LG_READERS
- RBITS:读锁移动的位数_,_默认值为_WBIT _- 1L
- RFULL:移动的位数,默认值为_RBITS _- 1L
- ABITS:锁移动的位数,默认值为 _RBITS _| WBIT
- SBITS:锁移动的位数,默认值为 ~RBITS
- 等待队列节点标识:
- WAITING:等待状态的初始值,默认值为-1
- CANCELLED:取消状态的初始值,默认值为1
- 读写锁的模式标识:
- RMODE:读锁模式,默认值为0
- WMODE:写锁模式,默认值为1
- CAS操作状态标识:封装了CAS操作状态标识,还通过反射实例化了Unsafe对象实例。
2.2 内置的等待队列WNode
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** Wait nodes */
static final class WNode {
/** StampedLock锁-队列前驱节点 */
volatile WNode prev;
/** StampedLock锁-队列后驱节点 */
volatile WNode next;
/** StampedLock锁-锁的存储列表 */
volatile WNode cowait; // list of linked readers
/** StampedLock锁-线程对象 */
volatile Thread thread; // non-null while possibly parked
/** StampedLock锁-锁的状态 */
volatile int status; // 0, WAITING, or CANCELLED
/** StampedLock锁-锁的模式 */
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
/** StampedLock锁-头部节点 */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
/** StampedLock锁-尾部节点 */
private transient volatile WNode wtail;
}
对于StampedLock锁对于等待队列的实现,主要包含以下几个方面的内容,其中:<br />
- 封装了一个等待队列WNode的静态内部类,其中:
- prev:等待队列的前驱节点
- next:等待队列的后驱节点
- cowait:表示依据锁标记存储当前线程入队的情况,队列锁列表
- thread: 线程对象,一般都是当前获取锁的线程
- status:用于表示锁的状态变量,对应着常量0,WAITING(-1), CANCELLED(1),其中,0表示正常状态,WAITING(-1)为等待状态,CANCELLED(1)为取消状态。
- mode:用于表示锁的模式,对应着常量RMODE和WMODE,其中RMODE为写模式,WMOD为读模式
- 构造方法WNode(int m, WNode p):用于实例化WNode对象,实现一个等待队列
- 实例化等待队列对象,主要封装一个头部节点whead和尾部节点wtail的对象
2.3 共用的读锁核心处理逻辑
首先,对于StampedLock锁的读锁视图与写锁视图的队列操作,有一个核心的处理逻辑:
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-取消入队列 */
private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
if (node != null && group != null) {
Thread w;
node.status = CANCELLED;
// unsplice cancelled nodes from group
for (WNode p = group, q; (q = p.cowait) != null;) {
if (q.status == CANCELLED) {
U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
p = group; // restart
}
else
p = q;
}
if (group == node) {
for (WNode r = group.cowait; r != null; r = r.cowait) {
if ((w = r.thread) != null)
U.unpark(w); // wake up uncancelled co-waiters
}
for (WNode pred = node.prev; pred != null; ) { // unsplice
WNode succ, pp; // find valid successor
while ((succ = node.next) == null ||
succ.status == CANCELLED) {
WNode q = null; // find successor the slow way
for (WNode t = wtail; t != null && t != node; t = t.prev)
if (t.status != CANCELLED)
q = t; // don't link if succ cancelled
if (succ == q || // ensure accurate successor
U.compareAndSwapObject(node, WNEXT,
succ, succ = q)) {
if (succ == null && node == wtail)
U.compareAndSwapObject(this, WTAIL, node, pred);
break;
}
}
if (pred.next == node) // unsplice pred link
U.compareAndSwapObject(pred, WNEXT, node, succ);
if (succ != null && (w = succ.thread) != null) {
succ.thread = null;
U.unpark(w); // wake up succ to observe new pred
}
if (pred.status != CANCELLED || (pp = pred.prev) == null)
break;
node.prev = pp; // repeat if new pred wrong/cancelled
U.compareAndSwapObject(pp, WNEXT, pred, succ);
pred = pp;
}
}
}
WNode h; // Possibly release first waiter
while ((h = whead) != null) {
long s; WNode q; // similar to release() but check eligibility
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (h == whead) {
if (q != null && h.status == 0 &&
((s = state) & ABITS) != WBIT && // waiter is eligible
(s == 0L || q.mode == RMODE))
release(h);
break;
}
}
return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
}
}
其次,对于StampedLock锁的读锁视图的实现作来看,主要核心处理如下:
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-获取读锁 */
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
else {
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
}
然后,对于StampedLock锁的写锁视图的实现作来看,主要核心处理如下:
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-获取写锁 */
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
else if (h != null) { // help release stale waiters
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
}
最后,综合对于StampedLock锁的读锁和写锁的获取和释放等操作来看,主要核心处理都会调用以下2个方法,其中:
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-读锁溢出递增处理方法 */
private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
++readerOverflow;
state = s;
return s;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
/** StampedLock锁-读锁溢出递减处理方法 */
private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next;
if ((r = readerOverflow) > 0) {
readerOverflow = r - 1;
next = s;
}
else
next = s - RUNIT;
state = next;
return next;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
}
- tryIncReaderOverflow()方法:主要是实现对于锁获取自旋时最大重试次数的递增运算。其中:
- 对于满足_stamp_ >= RFULL条件时,利用compareAndSwapLong()方法来实现CAS操作加持修改状态值。对于readerOverflow作自增运算后返回一个_stamp,可能存在更新和释放操作。_
- 否则,利用LockSupport.nextSecondarySeed() 判断,对于线程做让步处理,默认返回0
- tryDecReaderOverflow()方法:主要是实现对于锁获取自旋时最大重试次数的递减运算。其中:
- 对于满足_stamp_ == RFULL条件时,利用compareAndSwapLong()方法来实现CAS操作加持修改状态值。对于readerOverflow>0做递减运算后返回一个_stamp,可能存在更新和释放操作。_
- 否则,利用LockSupport.nextSecondarySeed() 判断,对于线程做让步处理,默认返回0
2.4 基于Lock接口实现的ReadLockView
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-ReadLockView */
final class ReadLockView implements Lock {
/** StampedLock锁-获取锁 */
public void lock() { readLock(); }
/** StampedLock锁-获取可中断锁 */
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
/** StampedLock锁-尝试获取锁 */
public boolean tryLock() { return tryReadLock() != 0L; }
/** StampedLock锁-尝试获取可超时锁 */
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
/** StampedLock锁-释放 */
public void unlock() { unstampedUnlockRead(); }
/** StampedLock锁-不支持条件变量定义 */
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
/** StampedLock锁-实例化ReadLock方法 */
public Lock asReadLock() {
ReadLockView v;
return ((v = readLockView) != null ? v :
(readLockView = new ReadLockView()));
}
/** StampedLock锁-实例化ReadLock方法 */
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
/** StampedLock锁-实例化ReadLock方法 */
public long tryReadLock() {
for (;;) {
long s, m, next;
if ((m = (s = state) & ABITS) == WBIT)
return 0L;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}
/** StampedLock锁-实例化ReadLock方法 */
public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
if ((m = (s = state) & ABITS) != WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
/** StampedLock锁-释放锁方法 */
final void unstampedUnlockRead() {
// 自旋操作
for (;;) {
long s, m; WNode h;
if ((m = (s = state) & ABITS) == 0L || m >= WBIT)
throw new IllegalMonitorStateException();
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
}
对于ReadLock的实现,主要包含以下几个方面的内容,其中:
- 基本实现方式:基于Lock接口实现,提供了对应的锁获取和释放操作方法,其中:
- lock()方法:一般模式,主要通过StampedLock类中readLock()方法实现
- lockInterruptibly()方法:可中断模式,主要通过StampedLock类中readLockInterruptibly()方法实现
- 无参数tryLock() 方法:尝试获取锁,主要依据StampedLock类中tryReadLock() != 0L来实现
- 有参数tryLock() 方法:尝试获取锁,主要依据StampedLock类中tryReadLock(long time, TimeUnit unit)!= 0L来实现
- unlock()方法:锁的释放,主要通过StampedLock类中unstampedUnlockRead()方法实现
- newCondition() 方法:不支持条件变量的定义,默认设置抛出UnsupportedOperationException
- 对应处理方法:主要是在StampedLock外层实现的操作方法,其中:
- readLock()方法:读锁的实现,主要核心逻辑在acquireRead()方法
- tryReadLock()方法:尝试获取读锁,核心处理逻辑是根据对应的条件返回对应的锁的_stamp,否则抛出_InterruptedException。
- readLockInterruptibly()方法:读锁的可中断机制实现,核心处理逻辑是判断线程是否中断以及利用acquireRead方法验证,条件成立时,返回锁的_stamp,否则抛出_InterruptedException。
- unstampedUnlockRead()方法:释放锁,核心处理逻辑自旋操作+compareAndSwapLong实现。
2.5 基于Lock接口实现的WriteLockView
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
final class WriteLockView implements Lock {
public void lock() { writeLock(); }
public void lockInterruptibly() throws InterruptedException {
writeLockInterruptibly();
}
public boolean tryLock() { return tryWriteLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryWriteLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockWrite(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
/** StampedLock锁-实例化WriteLock方法 */
public Lock asWriteLock() {
WriteLockView v;
return ((v = writeLockView) != null ? v :
(writeLockView = new WriteLockView()));
}
}
对于WriteLockView的实现,主要包含以下几个方面的内容,其中:
- 基本实现方式:基于Lock接口实现,提供了对应的锁获取和释放操作方法,其中:
- lock()方法:一般模式,主要通过StampedLock类中WriteLock()方法实现
- lockInterruptibly()方法:可中断模式,主要通过StampedLock类中writeLockInterruptibly()方法实现
- 无参数tryLock() 方法:尝试获取锁,主要依据StampedLock类中tryWriteLock() != 0L来实现
- 有参数tryLock() 方法:尝试获取锁,主要依据StampedLock类中tryWriteLock(long time, TimeUnit unit) != 0L来实现
- unlock()方法:锁的释放,主要通过StampedLock类中unstampedUnlockWrite()方法实现
- newCondition() 方法:不支持条件变量的定义,默认设置抛出UnsupportedOperationException
- 核心处理方法:主要是在StampedLock外层实现的操作方法,其中:
- writeLock()方法:写锁的实现,主要核心逻辑在acquireWrite()方法
- tryWriteLock()方法:尝试获取写锁,核心处理逻辑是根据对应的条件返回对应的锁的_stamp,否则抛出_InterruptedException。
- writeLockInterruptibly()方法:写锁的可中断机制实现,核心处理逻辑是判断线程是否中断以及利用acquireWrite方法验证,条件成立时,返回锁的_stamp,否则抛出_InterruptedException。
- unstampedUnlockWrite()方法:释放锁,核心处理逻辑主要是通过调用release(WNode h) 方法实现。
2.6 基于ReadWriteLock接口实现ReadWriteLockView
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-实例化ReadWriteLock方法 */
final class ReadWriteLockView implements ReadWriteLock {
/** StampedLock锁-ReadLock方法 */
public Lock readLock() { return asReadLock(); }
/** StampedLock锁-WriteLock方法 */
public Lock writeLock() { return asWriteLock(); }
}
/** StampedLock锁-实例化ReadWriteLock方法 */
public ReadWriteLock asReadWriteLock() {
ReadWriteLockView v;
return ((v = readWriteLockView) != null ? v :
(readWriteLockView = new ReadWriteLockView()));
}
}
对于ReadWriteLockView的实现,主要包含两个部分,其中:
- 基于ReadWriteLock接口实现,主要是实现readLock()和writeLock()方法
- 在asReadWriteLock()方法中,实例化ReadWriteLockView对象
3. 具体实现
对于StampedLock的具体实现,我们可以从如下几个方面拆解开来分析:
- 共享锁ReadLock锁获取操作实现: 需要区分悲观读锁和乐观读锁的获取个有不同,一般有默认获取方式和尝试获取两种方式。
- 独占锁WriteLock写锁获取操作实现: 写锁与悲观读锁互斥,一般有默认获取方式和尝试获取两种方式
- 共享锁ReadLock锁释放操作实现: 一般分为全释放和半释放ReadLock锁操作两种方式
- 独占锁WriteLock锁释放操作实现:一般分为全释放和半释放WriteLock锁操作两种方式
接下来,我们便从具体的代码中来分析以上内容的基本实现,以方便我们正确认识和了解StampedLock锁。
3.1 共享锁ReadLock读锁获取操作实现
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-悲观读锁-尝试获取锁(默认模式,不支持超时机制) */
public long tryReadLock() {
// 锁自旋转+compareAndSwapLong来CAS操作加持
for (;;) {
long s, m, next;
// [1].直接返回0
if ((m = (s = state) & ABITS) == WBIT)
return 0L;
// [2].compareAndSwapLong来CAS操作加持
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// [3].尝试获取读锁溢出处理
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}
/** StampedLock锁-悲观读锁-尝试获取锁(指定模式,支持超时机制) */
public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
if ((m = (s = state) & ABITS) != WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
/** StampedLock锁-乐观读锁-尝试获取锁 */
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
}
对于读锁的获取来说,都属于是共享锁,主要提供了以下几种方式:
- 无参数tryReadLock()方法:悲观读锁的获取方式,默认模式,不支持超时机制
- 有参数tryReadLock()方法:悲观读锁的获取方式,指定参数模式,支持超时机制
- 无参数tryOptimisticRead()方法:乐观读锁的获取方式,没有加锁操作
3.2 独占锁WriteLock写锁获取操作实现
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-获取写锁操作(不支持超时机制) */
public long tryWriteLock() {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : 0L);
}
/** StampedLock锁-获取写锁操作(支持超时机制) */
public long tryWriteLock(long time, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
long next, deadline;
if ((next = tryWriteLock()) != 0L)
return next;
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
}
对于写锁的获取来说,都属于是独占锁,主要提供了以下几种方式:
- 无参数tryWriteLock()方法:默认模式,不支持超时机制
- 有参数tryWriteLock()方法:指定模式,依据参数来实现,支持超时机制
3.3 共享锁ReadLock释放操作实现
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-释放锁操作 */
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L)
break;
else if (m == WBIT) {
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}
/** StampedLock锁-释放读锁 */
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
/** StampedLock锁-悲观读锁-转换升级并释放处理 */
public long tryConvertToReadLock(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = s + (WBIT + RUNIT);
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a != 0L && a < WBIT)
return stamp;
else
break;
}
return 0L;
}
/** StampedLock锁-乐观读锁-转换升级并释放处理 */
public long tryConvertToOptimisticRead(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
U.loadFence();
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS))
break;
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
return s;
}
else if (m == WBIT) {
if (a != m)
break;
state = next = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return next & SBITS;
}
}
else if ((next = tryDecReaderOverflow(s)) != 0L)
return next & SBITS;
}
return 0L;
}
public boolean tryUnlockRead() {
long s, m; WNode h;
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return true;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}
/** StampedLock锁-尝试释放读锁 */
public boolean tryUnlockRead() {
long s, m; WNode h;
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return true;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}
}
对于读锁的释放来说,主要提供了以下几种方式:
- unlock() 方法:依据锁的状态status来匹配对应锁的stamp,然后释放锁操作
- unlockRead()方法: 依据锁的状态status来匹配对应读锁的stamp,然后释放锁操作
- tryUnlockRead()方法:释放当前持有的读锁,会设置一个stamp然后返回true,否则,返回false
- tryConvertToReadLock()方法:依据锁的状态status来匹配对应读锁的stamp,然后根据对应情况处理。其中:
- 单写锁模式:一般返回一个对应读锁的stamp
- 悲观读模式:直接返回对应读锁的stamp
- 乐观读模式:需要获取一个读锁,然后是立即返回对应读锁的stamp
- tryConvertToOptimisticRead(): 依据锁的状态status来匹配对应读锁的stamp,然后转换升级处理释放。其中:
- 悲观读模式:属于一般读锁模式,返回的是检测到对应读锁的stamp
- 乐观读模式:需要返回通过验证的对应读锁的stamp
3.4 独占锁WriteLock写锁释放操作实现
/** StampedLock锁-最早在JDK1.8中实现的 */
public class StampedLock implements java.io.Serializable {
/** StampedLock锁-锁释放方法(一般方法) */
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L)
break;
else if (m == WBIT) {
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}
/** StampedLock锁-写锁释放方法 */
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
/** StampedLock锁-写锁转换升级处理并释放锁 */
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
else if (m == WBIT) {
if (a != m)
break;
return stamp;
}
else if (m == RUNIT && a != 0L) {
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
else
break;
}
return 0L;
}
/** StampedLock锁-unlockWrite的核心实现 */
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
}
对于写锁的释放来说,主要提供了以下种方式:
- unlock() 方法:依据锁的状态status来匹配对应锁的stamp,然后释放锁操作
- unlockWrite()方法:依据锁的状态status来匹配对应写锁的stamp,然后释放锁操作
- tryUnlockWrite()方法:释放当前持有的写锁,会设置一个stamp然后返回true,否则,返回false
- tryConvertToWriteLock()方法:依据锁的状态status来匹配stamp,根据对应锁的做升级处理。其中:
- 单写锁模式:直接返回对应的写锁标记stamp
- 读写锁模式:需要释放读锁锁,并返回对应的写锁标记stamp
- 乐观读模式:直接返回对应的写锁标记stamp
综上所述,StampedLock锁本质上依然是一种读写锁,只是没有基于AQS基础同步器来实现,是自定义封装API操作实现的。
六.CountDownLatch(闭锁)的设计与实现
在Java领域中,CountDownLatch(闭锁)是针对于Java多线程并发控制中倒计数器的具体数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器来实现的一种同步器工具类。
CountDownLatch(闭锁)是Java多线程并发中最常见的一种同步器,从锁的性质上来看,属于共享锁,其功能相当于一个多线程环境下的倒数门闩。<br />CountDownLatch通过定义一个倒计数器,在并发环境下由线程进行递减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒。<br />通过CountDownLatch可以实现线程间的计数同步。
1. 设计思想
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上等待N个条件都满足后,才让所有的线程继续往下执行,其中倒计数器的数量则为N,每满足一个条件,倒计数器就依次逐渐递减1,直到N-1=0的时,所有等待的线程才往下继续执行。<br />CountDownLatch类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的同步器,控制阻塞等待的方法,倒计数器的递减操作方法等3个核心要素。其中:
- 倒计数器的同步器:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步类,主要用于设置倒计数器的初始值以及定制AQS基础同步器的获取和释放共享锁。
- 倒计数器的初始值: 一般在构建CountDownLatch类时指定,表示的是需要等待条件的个数,即就是倒计数器的具体的资源数量Source(N)。
- 控制线程阻塞等待的方法:定义一个控制线程阻塞等待的方法,当倒计数器的具体的资源数量 Source(N)>0时,调用方法使其线程进入阻塞等待状态。
- 倒计数器的递减操作方法:定义一个倒计数器的递减操作方法,调用方法就会把倒计数器递减1,当倒计数器的具体的资源数量 Source(N)-1=0时,所有等待的线程才往下继续执行。
简单来说,CountDownLatch主要是让某个线程或者多个线程,等待其他线程完成某件事情或者某个任务结束之后才能继续执行。
2. 基本实现
在CountDownLatch类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CountDownLatch {
private final Sync sync;
/**
* CountDownLatch锁-构造一个倒计数器
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* CountDownLatch锁-基于AQS定义支持同步器实现
*/
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860 L;
//......其他方法代码
}
/**
* CountDownLatch锁-线程等待方法
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* CountDownLatch锁-倒计数器递减操作
*/
public void countDown() {
sync.releaseShared(1);
}
//... 其他代码
}
- 倒计数器同步器:基于AQS基础定义支持同步器实现一个静态私有化的同步器Sync类,其中定义了获取和释放共享锁的两个方法
- 线程等待方法:主要是提供了一个await()方法,其本质是调用的是AQS基础同步器中的acquireSharedInterruptibly(int arg)方法,否则throws InterruptedException异常
- 倒计数器递减操作方法: 主要是提供了一个countDown()方法,其本质是调用的是AQS基础同步器中的releaseShared(int arg) 方法
2.1 基于AQS同步器封装静态内部Sync抽象类
/**
* CountDownLatch锁-基于AQS同步器封装一个内部的同步器
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/**
* CountDownLatch锁-获取共享锁方法
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* CountDownLatch锁-释放共享锁方法
*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
- 实现方式: 主要基于AQS封装的内部静态抽象Sync同步类实现,使用的AQS的共享模式
- 主要方法: 主要定制适配提供了tryAcquireShared()和tryReleaseShared()方法,即就是tryAcquireShared()用于获取共享锁,tryReleaseShared()方法用于释放共享锁,其中:
- 获取共享锁tryAcquireShared()方法:首先获取状态变量status,这里是指倒计数器中的数量,当status=0时,返回值=1,表示获取锁成功;否则,status !=0 时,返回值=-1,表示获取共享锁失败进行入队。
- 释放共享锁tryReleaseShared()方法: 通过自旋来实现递减操作,其中会获取状态变量status,将其递减1后使用compareAndSetState(c, nextc)方法通过CAS修改状态值
- 锁获取方式: 主要是利用getCount()来获取倒计数器中的数量,同时还可以利用构造方法指导一个倒计数器中的数量。
3. 具体实现
public class CountDownLatch {
private final Sync sync;
/**
* CountDownLatch锁-基于AQS基础同步器实现一个内部同步器
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374 L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
/**
* CountDownLatch锁-构造一个倒计数器
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* CountDownLatch锁-基于AQS定义支持同步器实现
*/
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860 L;
//......其他方法代码
}
/**
* CountDownLatch锁-线程等待方法
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* CountDownLatch锁-返回当前计数器
*/
public long getCount() {
return sync.getCount();
}
/**
* CountDownLatch锁-线程等待方法(支持超时机制)
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* CountDownLatch锁-倒计数器递减操作
*/
public void countDown() {
sync.releaseShared(1);
}
}
- 倒计数初始值:通过构造方法CountDownLatch(int count)指定一个倒计数器的初始值,其必须大于0,否则会throw new IllegalArgumentException(“count < 0”)
- 线程等待方法: 主要提供了await() 方法和await(long timeout, TimeUnit unit)方法,其中:
- 无参数await() 方法: 一般默认的方法,其本质是调用AQS同步器中的acquireSharedInterruptibly()方法,主要表示支持中断机制
- 有参数await(long timeout, TimeUnit unit)方法: 是用于实现超时机制,其本质是调用AQS同步器中的tryAcquireSharedNanos(int arg, long nanosTimeout)方法
- 倒计数递减操作方法:主要是countDown() 方法, 其本质是调用AQS同步器中的releaseShared(int arg) 方法,核心实现是AQS基础同步器的doReleaseShared方法。
- 其他方法: 主要是getCount() 方法,用来获取倒计数个数,其本质是调用AQS同步器中getCount()方法,来获取状态变量
综上所述,从一定意义上讲,CountDownLatch是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,没有支持公平模式与非公平模式的实现。
七.CyclicBarrier(循环屏障)的设计与实现
在Java领域中,CyclicBarrier(循环屏障)是针对于Java多线程并发控制中倒计数器的线程数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器实现的ReentrantLock锁来实现的一种同步器工具类。
CyclicBarrier(循环屏障)是Java中通过对线程预定义设置一个屏障,只有当到达屏障的线程数量到达指定的最大屏障时,屏障才会让这些线程通过执行。<br />从一定意义上来讲,这里的屏障本质上还是一个倒计数器,倒计数器的最大限度支持的数量就是我们为线程设置屏障大小,其工作原理与CountDownLatch(闭锁)类似,都是通过让线程阻塞等待时,倒计数器执行递减1运算。<br />但是与CountDownLatch不同是,CyclicBarrier(循环屏障)是基于ReentrantLock(可重入锁)来实现的,更准确的说,CyclicBarrier是对ReentrantLock的应用实例。
1. 设计思想
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上约束N个线程,需要让指定数量的线程共同到达某一个节点之后,这些线程才会一起被执行。<br />CyclicBarrier(循环屏障)最早是在JDK1.5版本中提供的,从设计思想上来看,主要包括倒计数器的最大屏障,控制阻塞等待的方法,倒计数器的递减操作方法,和触发点线程任务等4个核心要素。其中:
- 倒计数器的同步器: 主要基于ReentrantLock来实现控制线程对象,其本质还是基于AQS基础同步器实现。
- 倒计数器的最大屏障数量:一般是在构建CyclicBarrier(循环屏障)对象是预定义设置,表示需要在某个运行节点上约束的线程数量。
- 控制线程阻塞等待的方法:定义一个方法,使得实现阻塞线程让其进入等待状态。
- 倒计数器的递减操作方法:定义一个方法,使得让倒计数器进行递减1运算,直到达到屏障时,等待的线程才继续执行。
- 触发点线程任务:一般指的是当指定数量的线程达到设置的屏障时,才会去触发执行的任务。
简单来说,CyclicBarrier(循环屏障)是让多个线程互相等待,直到达到一个同步的运行节点。再继续一起执行。
2. 基本实现
在CyclicBarrier类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CyclicBarrier {
/** CyclicBarrier锁—屏障lock实体 */
private final ReentrantLock lock = new ReentrantLock();
/** CyclicBarrier锁—屏障条件队列 */
private final Condition trip = lock.newCondition();
/** CyclicBarrier锁—屏障最大值 */
private final int parties;
/** CyclicBarrier锁—屏障触发线程任务目标 */
private final Runnable barrierCommand;
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private Generation generation = new Generation();
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private int count;
/** CyclicBarrier锁—屏障实例 */
private static class Generation {
boolean broken = false;
}
/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */
public CyclicBarrier(int parties) {
this(parties, null);
}
/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/** CyclicBarrier锁—无参数构造一个等待方法(默认模式) */
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/** CyclicBarrier锁—有参数构造一个等待方法(支持超时机制) */
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/** CyclicBarrier锁—更新状态变量 */
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/** CyclicBarrier锁—阻塞屏障 */
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
//...其他代码
}
- 预定义设置屏障最大值: 主要是通过变量parties来实现预定义设置屏障最大值
- 设置当前屏障数量:主要是通过变量count来实现
- 控制线程的对象实例: 主要是通过ReentrantLock和Condition来控制线程间通信
- 触发目标任务对象: 主要是通过Runable来定义barrierCommand变量
- 提供了两个构造方法:都需要预定义指定屏障最大值parties,其中一个需要传入barrierAction触发点任务
- 线程阻塞等待方法:主要提供了2个await()方法,其中:
- 无参数await()方法:默认处理方式,不支持超时机制,其核心处理逻辑在dowait(boolean timed, long nanos)方法中实现
- 有参数await()方法:指定参数处理,支持超时机制,其核心处理逻辑在dowait(boolean timed, long nanos)方法中实现
- 屏障设置关健方法:主要是breakBarrier() 来实现,其中:
- 通知到达屏障的所有线程:主要是通过Condition中的signalAll()来通知屏障中所有线程已经满足条件
- 屏障设置:默认预定义设置屏障最大值与设置当前屏障数相同,主要设置count = parties
- 更新屏障状态:主要是通过generation.broken = true来实现
- 更新屏障的状态:主要是提供了nextGeneration() 方法,表示已经到达预定义设置屏障最大值,其中:
- 通知到达屏障的所有线程:主要是通过Condition中的signalAll()来通知屏障中所有线程已经满足条件
- 准备下一轮屏障设置:意味着预定义设置屏障最大值与设置当前屏障数相同,主要设置count = parties
- 重置屏障状态:主要是通过generation = new Generation()来实现
一般来说,假设我们允许控制的最大线程数量为N,预定义设置屏障最大值为Parties(N), 当前屏障的线程数量为Current(N) ,当前屏障中的等待线程数量为Waiting(N),那么我们会得到一个计算公式:<br />[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n5fsK6da-1663745683568)(https://cdn.nlark.com/yuque/__latex/beffa3cd95a206bbbd5949193e551881.svg#card=math&code=Waiting(N) %3D Parties(N) - Current(N) &id=yeZuU)]
2.1 构造Generation屏障实例标记
private static class Generation {
boolean broken = false;
}
主要是构造了一个静态私有化的Generation类,其中定义了一个broken变量来作为屏障标记,默认初始值为false,表示还没达到屏障最大值。
2.1 线程阻塞等待核心dowait方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// [1].实例化构建ReentrantLock的对象
final ReentrantLock lock = this.lock;
// [2].通过lock()获取锁或者说加锁操作
lock.lock();
try {
// [3].实例化构建Generation屏障实例对象
final Generation g = generation;
// [4].判断Generation屏障实例标记状态
if (g.broken)
throw new BrokenBarrierException();
// [5].判断Thread是包含中断标志位
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// [6].对倒计数器的屏障数量递减1运算
int index = --count;
// [7].依据结果index == 0表示当前指定的线程数量到达屏障最大值,需要触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 进行下一轮屏障设置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// [7].自旋操作
for (;;) {
try {
// 判断是否超时
if (!timed)
trip.await();
else if (nanos > 0L)
// 进行下一轮屏障设置
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 是否发生线程中断
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 如果等待时间超过指定超时时间,throw new TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 最后释放锁操作
lock.unlock();
}
}
- 加锁操作: 实例化构建ReentrantLock的对象,通过lock()方法进行加锁操作
- 判断屏障实例标记状态:实例化构建Generation实例标记,判断屏障实例标记状态是否一致,如果不一致则throw new BrokenBarrierException();
- 判断当前线程是否被中断: 判断Thread是包含中断标志位,如果中断throw new InterruptedException()并调用breakBarrier()重新设置屏障
- 屏障倒计数器递减运算:对倒计数器的屏障数量递减1运算,即就是对当前倒计数器的当前值减去1
- 触发节点线程任务: 当前倒计数器的当前值为0时,需要触发Runnable任务,并调用nextGeneration方法开启下一**作;否则,当前倒计数器的当前值不为0时,调用awaitNanos(nanos)方法进入等待状态
- 自旋操作判断超时: 如果使用了超时参数,调用awaitNanos(nanos)方法进入等待状态,其中如果发生中断则调用Thread.currentThread().interrupt()设置中断标记。如果等待时间> 指定超时时间,抛出throw new TimeoutException()异常
- 释放锁: 通过unlock()方法进行解锁操作,并释放锁
3. 具体实现
在CyclicBarrier类的JDK1.8版本中,对于CyclicBarrier的具体实现如下:
public class CyclicBarrier {
/** CyclicBarrier锁—屏障lock实体 */
private final ReentrantLock lock = new ReentrantLock();
/** CyclicBarrier锁—屏障条件队列 */
private final Condition trip = lock.newCondition();
/** CyclicBarrier锁—屏障最大值 */
private final int parties;
/** CyclicBarrier锁—屏障触发线程任务目标 */
private final Runnable barrierCommand;
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private Generation generation = new Generation();
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private int count;
/** CyclicBarrier锁—屏障实例 */
private static class Generation {
boolean broken = false;
}
/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */
public CyclicBarrier(int parties) {
this(parties, null);
}
/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/** CyclicBarrier锁—无参数构造一个等待方法(默认模式) */
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/** CyclicBarrier锁—有参数构造一个等待方法(支持超时机制) */
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/** CyclicBarrier锁—更新状态变量 */
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/** CyclicBarrier锁—阻塞屏障 */
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/** CyclicBarrier锁—阻塞屏障 */
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// [1].实例化构建ReentrantLock的对象
final ReentrantLock lock = this.lock;
// [2].通过lock()获取锁或者说加锁操作
lock.lock();
try {
// [3].实例化构建Generation屏障实例对象
final Generation g = generation;
// [4].判断Generation屏障实例标记状态是否为true
if (g.broken)
throw new BrokenBarrierException();
// [5].判断Thread是包含中断标志位
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// [6].对倒计数器的屏障数量递减1运算
int index = --count;
// [7].依据结果index == 0表示当前指定的线程数量到达屏障最大值,需要触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 进行下一轮屏障设置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// [7].自旋操作
for (;;) {
try {
// 判断是否超时
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 如果等待时间超过指定超时时间,throw new TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 最后释放锁操作
lock.unlock();
}
}
/** CyclicBarrier锁—获取当前等屏障等待数量 */
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
/** CyclicBarrier锁—获取当前等屏障数量 */
public int getParties() {
return parties;
}
/** CyclicBarrier锁—判断当前屏障 */
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/** CyclicBarrier锁—重置屏障数量 */
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
}
主要需要注意如下几个方法,都是基于ReentrantLock来实现加锁和解锁操作的,其中:
- getNumberWaiting()方法: 获取当前屏障中等待的线程数量
- reset() 方法:当一轮屏障操作结束,需要重置屏障中最大线程数量
- isBroken() 方法:判断是否到达屏障最大值
综上所述,从一定意义上讲,CyclicBarrier是一种可重入锁,属于ReentrantLock的应用实例,其中加锁和解锁操作都是独占模式的。
八.Semaphore(信号量)的设计与实现
在Java领域中,Semaphore(信号量)是针对于Java多线程并发控制中实现对公共资源的线程数量进行并发同时访问控制,主要是采用指定一个最大许可数的思想和基于AQS基础同步器来实现的一种同步器工具类。
Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。<br />Semaphore维护了一组虚拟许可,它的数量可以通过构造器的参数指定。<br />线程在访问共享资源前,必须调用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。<br />线程在访问共享资源后,必须调用Semaphore的release()方法释放许可。
1. 设计思想
一般来说,通过定义一个倒计数器,为了控制最多N个线程同时访问公共资源,其计数器的最大值Max(N)是被许可的最多N个线程数量,即就是许可的最大值N。<br />Semaphore类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的最大许可数,同步器工作模式,获取锁方法,释放锁方法等4个核心要素。其中:
- 同步器工作模式:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步器抽象类,然后基于这个抽象类分别实现了公平同步器和非公平同步器,用来指定和描述同步器工作模式是公平模式还是非公平模式。
- 公平/非公平模式:主要描述的是多个线程在同时获取锁时是否按照先到先得的顺序获取锁,如果是则为公平模式,否则为非公平模式。
- 获取锁方法:主要定义了一个lock()方法来获取锁,表示假如锁已经被其他线程占有或持有,其当前获取锁的线程则进入等待状态。
- 释放锁方法:主要定义了一个unlock()方法来释放锁,表示假如锁已经被其他线程放弃或释放,其当前获取锁的线程则获得该锁。
2. 基本实现
在JDK1.8版本中,对于Semaphore的基本实现如下:
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210 L;
/**
* Semaphore锁- 封装同步器
*/
private final Sync sync;
/**
* Semaphore锁- 封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
//....其他代码
}
/**
* Semaphore锁- 构造一个令牌许可(默认非公模式)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore锁- 构造一个令牌许可(可选公平/非公模式)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* Semaphore锁- 获取锁方法(默认一个且可中断机制)
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Semaphore锁- 获取锁方法(可选指定多个且可中断机制)
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* Semaphore锁- 获取锁方法(默认多个且不可中断机制)
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* Semaphore锁- 获取锁方法(指定多个且不可中断机制)
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* Semaphore锁-释放锁方法(默认一个)
*/
public void release() {
sync.releaseShared(1);
}
/**
* Semaphore锁-释放锁方法(可选指定多个)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
}
- 内部同步器:基于AQS基础同步器封装和定义了一个静态内部Sync抽象类,其中抽象了一个内置锁lock()方法
- 同步器工作模式:主要提供了 2个构造方法,其中无参数构造方法表示的是默认的工作模式,有参数构造方法主要依据参数来实现指定的工作模式
- 获取锁方法: 主要提供了3个基于acquire方法,用于获取锁共享锁,其中:
- 无参数acquire()方法:获取共享锁的一般模式,默认指定一个许可和支持可中断机制
- 有参数acquire()方法:获取共享锁的指定模式,可选指定多个许可且支持可中断机制
- 无参数acquireUninterruptibly()方法:获取共享锁的指定模式,默认指定一个许可且不支持可中断机制
- 释放锁方法: 主要是提供了2个release()方法用于释放锁共享锁,其中:
- 无参数release()方法:释放共享锁的一般模式,默认指定一个许可和支持可中断机制
- 有参数release()方法:释放共享锁的指定模式,可选指定多个许可且支持可中断机制
2.1 基于AQS同步器封装静态内部Sync抽象类
/**
* Semaphore锁- 基于AQS基础同步器封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933 L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
/**
* Semaphore锁- 非公平模式获取共享锁
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Semaphore锁- 释放共享锁
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
- 实现方式:主要是基于AQS基础同步器封装了一个静态的的Sync抽象类,通过构造方法指定一个最大的令牌许可数量
- 主要方法:主要是看共享锁的获取nonfairTryAcquireShared()方法和释放锁tryReleaseShared()方法,其中:
- 获取锁nonfairTryAcquireShared()方法:非公平模式下获取共享锁,利用自旋+compareAndSetState()方法通过CAS操作,保证并发修改令牌许可数量
- 释放锁tryReleaseShared(i)方法: 公平/非公平模式下释放共享锁,利用自旋+compareAndSetState()方法通过CAS操作释放,会把释放的令牌许可数量增加到当前剩余的令牌许可数量中。
- 令牌许可操作方法:主要提供了drainPermits() 方法 和reducePermits() 方法,其中:
- drainPermits() 方法:主要是利用自旋+compareAndSetState()方法通过CAS操作重置令牌许可数
- reducePermits() 方法:主要是自旋+compareAndSetState)方法通过CAS操作递减计算操作令牌许可数
- 获取锁方式:令牌许可数量QS基础同步器状态变量对应,通过getPermits() 方法来获取令牌许可数量,本质是调用AQS基础同步器中的getState()来获取状态变量。
特别指出的是,这里的非公平模式主要描述的是,在令牌许可数量允许的情况下,让所有线程进行自旋操作,其实就是不关心线程到来的顺序,将全部线程放到一起去参与竞争令牌许可。<br />其中,主要还利用compareAndSetState方法来进行CAS操作,保证修改令牌许可数量的原子性操作。<br />一般来说,假设我们允许控制的最大线程数量为N,剩余令牌许可数量为Remanent(N), 当前可用令牌许可数量为Current(N) , 消耗令牌许可数量为Reduction(N),那么我们会得到一个计算公式:<br />[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5YG2FPer-1663745683570)(https://cdn.nlark.com/yuque/__latex/4f16e6cc9c06549a2db50b6670b1afad.svg#card=math&code=Remanent(N) %3D Current(N) - Reduction(N)&id=dFFnP)]<br />即就意味着,剩余令牌许可数量等于当前可用令牌许可数量与消耗令牌许可数量之差。<br />由此可见,在公平/非公平模式下,我们对于对于获取锁和释放锁时,对于剩余令牌许可数量Remanent(N)计算都满足以下公式:
- 首先,在线程在访问共享资源前,我们可以允许的最大值为Available(N),自旋获取锁的数量为Acquires(N),那么我们在获取锁时:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3G61Q5pq-1663745683570)(https://cdn.nlark.com/yuque/__latex/794ee8aa87f2994ac9e53788a43f1a89.svg#card=math&code=Remanent(N) %3D Available(N) - Acquires(N)&id=xcVLl)]
- 其次,在线程在访问共享资源后,自旋释放锁的数量为Releases(N),那么我们在释放锁时:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tXCL8Ses-1663745683570)(https://cdn.nlark.com/yuque/__latex/7a0a68bf7a89436faddfc22f75f4c6cc.svg#card=math&code=Remanent(N) %3D Current(N) %2B Releases(N)&id=DjDlt)]<br />当然,需要注意的的一个问题,就是当剩余令牌许可数量Remanent(N) < 0时,表示当前线程会进入阻塞等待状态。
2.2 基于Sync抽象类封装FairSync公平同步器
/**
* Semaphore锁- 基于Sync抽象类封装FairSync公平同步器
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944 L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
FairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
- 实现方式: 主要是在基于静态内部Sync抽象类来实现,构造了一个可指定大小的的令牌许可
- 主要方法: 主要是提供了一个tryAcquireShared方法,其中利用hasQueuedPredecessors()来保证公平性
- 工作机制: 通过基于AQS基础同步器中的等待队列来实现公平机制
需要注意的是,在未达到最大的令牌许可数量时,所有线程都不会进入等待队列中。
2.3 基于Sync抽象类封装NonfairSync非公平同步器
/**
* Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898 L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
NonfairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 非公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
- 实现方式: 主要是在基于静态内部Sync抽象类来实现,构造了一个可指定大小的的令牌许可
- 主要方法: 主要是提供了一个tryAcquireShared方法,其中主要是调用了静态内部Sync抽象类nonfairTryAcquireShared方法。
- 工作机制: 通过自旋操作让所有线程竞争获取令牌许可,本质还是采用了AQS基础同步器中闯入策略到打破公平的
3. 具体实现
在JDK1.8版本中,对于Semaphore的具体实现如下:
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210 L;
/**
* Semaphore锁- 封装同步器
*/
private final Sync sync;
/**
* Semaphore锁- 基于AQS基础同步器封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933 L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
/**
* Semaphore锁- 非公平模式获取共享锁
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Semaphore锁- 释放共享锁
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* Semaphore锁- 基于Sync抽象类封装FairSync公平同步器
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944 L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
FairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898 L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
NonfairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 非公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Semaphore锁- 构造一个令牌许可(默认非公模式)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore锁- 构造一个令牌许可(可选公平/非公模式)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* Semaphore锁- 获取锁方法(默认一个且可中断机制)
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Semaphore锁- 获取锁方法(可选指定多个且可中断机制)
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* Semaphore锁- 获取锁方法(默认多个且不可中断机制)
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* Semaphore锁- 获取锁方法(指定多个且不可中断机制)
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* Semaphore锁-释放锁方法(默认一个)
*/
public void release() {
sync.releaseShared(1);
}
/**
* Semaphore锁-释放锁方法(可选指定多个)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
* Semaphore锁-尝试获取锁方法(默认一个)
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* Semaphore锁-尝试获取锁方法(可选指定多个)
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* Semaphore锁-尝试获取锁方法(可选指定多个并且支持超时机制)
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* Semaphore锁-尝试获取锁方法(默认一个并且支持超时机制)
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* Semaphore锁-统计可以令牌许可数
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* Semaphore锁-重置令牌许可数
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* Semaphore锁-递减计算令牌许可数
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
/**
* Semaphore锁-判断是否公平模式
*/
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* Semaphore锁-判断队列中是否存在线程对象
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Semaphore锁-获取队列长度
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* Semaphore锁-获取队列的线程对象
*/
protected Collection < Thread > getQueuedThreads() {
return sync.getQueuedThreads();
}
}
- 信号量同步器: 主要是提供了2个构造方法来实现令牌许可的管理,其中:
- 默认非公平模式:依据指定传入的令牌许可数量permits直接实例化NonfairSync非公平同步器
- 可选公平/非公平模式:依据指定传入的令牌许可数量permits和公平标记fair来实例化NonfairSync非公平同步器和FairSync公平同步器,其中,当fair=true时,是公平平模式,否则为非公平模式
- 支持可中断机制:主要是提供了2个acquire()方法来获取锁,其中:
- 无参数acquire()方法:一般模式获取共享锁,主要是基于AQS基础同步器中的acquireSharedInterruptibly(int arg)来实现,其核心逻辑是doAcquireSharedInterruptibly(int arg)来操纵。
- 有参数acquire()方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用acquireSharedInterruptibly(permits)方法。
- 支持不可中断机制:主要是提供了2个acquireUninterruptibly() 方法,其中:
- 无参数acquireUninterruptibly() 方法:一般模式获取共享锁,主要是基于AQS基础同步器中acquireShared(int arg)方法来实现,其核心逻辑是doAcquireShared(int arg) 来操纵。
- 有参数acquireUninterruptibly() 方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用acquireShared(int arg)方法。
- 非公平模式获取锁方式: 主要提供了2个tryAcquire() 方法,其中:
- 无参数tryAcquire() 方法:非公平模式尝试获取共享锁,直接调用的是非公平同步器中的nonfairTryAcquireShared(int acquires) 方法。
- 有参数tryAcquire() 方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用nonfairTryAcquireShared(int acquires) 方法。
- 公平模式获取锁方式:主要提供了2个tryAcquire() 方法,支持超时机制。其中:
- 无参数tryAcquire() 方法:公平模式尝试获取共享锁,依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用的是AQS基础同步器中的tryAcquire(int permits, long timeout, TimeUnit unit)方法,其核心逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
- 有参数tryAcquire() 方法:公平模式尝试获取共享锁,默认支持一个许可,直接调用的是AQS基础同步器中的tryAcquire(1,long timeout, TimeUnit unit)方法,其核心逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
- 释放锁操作方式:主要提供了2个release()方法,其中:
- 无参数release() 方法:公平/非公平模式示范锁操作,默认支持一个许可,主要是直接调用AQS基础同步器中的releaseShared(int arg) 方法
- 有参数release() 方法:公平/非公平模式示范锁操作,依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,主要是直接调用AQS基础同步器中的releaseShared(int arg) 方法
- 令牌许可操作方法:主要提供了availablePermits() 方法,reducePermits(int reduction)方法 以及drainPermits() 方法,其中:
- availablePermits() 方法:获取可用的令牌许可数量,主要是调用内部同步器中getPermits()方法。
- reducePermits()方法:计算剩余可用令牌许可数量,依据指定传入的令牌许可数量reduction来判断,当reduction< 0时,直接throw new IllegalArgumentException();否则,调用内部同步器中reducePermits()方法。
- drainPermits() 方法:重置可用令牌许可数量,主要是调用内部同步器中drainPermits()方法。
- 队列操作方法:主要提供了hasQueuedThreads()方法,getQueuedThreads() 方法以及getQueueLength() 方法,其中:
- hasQueuedThreads()方法:主要是用于获取队列中是否存在等待获取令牌许可的线程对象,主要是直接使用AQS基础同步器的hasQueuedThreads()来实现。
- getQueuedThreads() 方法:主要是用于获取队列中等待获取令牌许可的线程对象,主要是直接使用AQS基础同步器的getQueuedThreads()来实现。
- getQueueLength() 方法:主要是用于获取队列中等待获取令牌许可的数量,主要是直接使用AQS基础同步器的getQueueLength()来实现。
综上所述,从一定意义上讲,Semaphore是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,支持公平模式与非公平模式,默认是使用非公平模式。
写在最后
通过对Java领域中,JDK内部提供的各种锁的实现来看,一直围绕的核心主要还是基于AQS基础同步器来实现的,但是AQS基础同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。<br />但是,实际上,Java对于锁的实现与运用远远不止这些,还有相位器(Phaser)和交换器(Exchanger),以及在Java JDK1.8版本之前并发容器ConcurrentHashMap中使用的分段锁(Segment)。<br />不论是何种实现和应用,在Java并发编程领域来讲,都是围绕线程安全问题的角度去考虑的,只是针对于各种各样的业务场景做的具体的实现。<br />一定意义上来讲,对线程加锁只是并发编程的实现方式之一,相对于实际应用来说,Java领域中的锁都只是一种单一应用的锁,只是给我们掌握Java并发编程提供一种思想没,三言两语也不可能详尽。<br />到此为止,这算是对于Java领域中并发锁的最终章,文中表述均为个人看法和个人理解,如有不到之处,忘请谅解也请给予批评指正。<br />最后,技术研究之路任重而道远,愿我们熬的每一个通宵,都撑得起我们想在这条路上走下去的勇气,未来仍然可期,与各位程序编程君共勉!