JDK1.5中引入了线程池,合理地利用线程池能有效的提高程序的运行效率,但不当的使用线程池也会带来致命的危害。作为使用最多的ThreadPoolExecutor,很有必要深入理解的其源码与实现原理。
先看一下ThreadPoolExecutor是如何工作的,暂时不看源码,这样会先有一个比较直观的印象有利于后面深入分析源码。
既然是线程池那么提交任务后一定要创建线程用于执行任务,ThreadPoolExecutor创建线程执行提交任务的流程如下。
简单介绍一下,一个任务提交给线程池后,线程池创建线程来执行提交任务的流程。
下图给出了ThreadPoolExecutor更加直观的整体运行图。图中标注1、2、3、4的分别对应上面分析中的第1、第2、第3、第4步。
结合上图补充几点:
ThreadPoolExecutor的UML类图如上图,其中Executor提供最基础的任务执行的抽象void execute(Runnable command)方法,而ExecutorService在其基础上扩展的管理线程池的一些方法shutdown()、shutdownNow()、isShutdown() 与isTerminated()等,同时增加了用三个重载的submit方法,用于获取任务的执行结果。submit可以提交Callable类型的任务,也可提交Runnable类型的任务。AbstractExecutorService类提供了newTaskFor将提交的Callable与Runnable类型任务转为FutureTask,同时提供了sumbit与invoke的默认实现,具体的任务执行逻辑交由子类ThreadPoolExecutor的execute方法。不管是调用submit还是execute的提交的任务,最终都交由ThreadPoolExecutor的execute方法执行。
execute方法是分析ThreadPoolExecutor源码的入口。
分析execute方法前先看一下ThreadPoolExecutor里面的核心变量与类。
//线程池状态与线程池中有效线程数控制变量,AtomicInteger变量的高3位用于
//保存线程池状态,低29位用于保存线程池中有效线程数。
//程线程对应状态如下:
// 1、RUNNING: 运行中,接收新的任务或处理队列中的任务 值为-536870912
// 2、SHUTDOWN: 关闭,不再接收新的任务,但会处理队列中的任务 值为0
// 3、STOP: 停止,不再接收新的任务,也不处理队列中的任务,并中断正在处理的任务 值为536870912
// 4、TIDYING: 所有任务已结束,队列大小为0,转变为TIDYING状态的线程将会执行terminated() hook 方法 值为1073741824
// 5、TERMINATED: 结束,terminated() 已被执行完 值为1610612736
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
与ctl变量有关的操作方法
//获取线程池的运行状态runState
//CAPACITY 二进制值为: 00011111111111111111111111111111
//~CAPACITY 按位取反为:11100000000000000000000000000000
//ctl&~CAPACITY 低29全为0,得到高3位即线程池的runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程池中有效的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//根据runState与workerCount计算出ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//判断线程池是否处于运行中
private static boolean isRunning(int c) {return c < SHUTDOWN;}
其它核心成员变量
//工作队列,提交任务超过corePoolSize时,任务被保证在workQueue中
private final BlockingQueue<Runnable> workQueue;
//处理wokers的锁
private final ReentrantLock mainLock = new ReentrantLock();
//工作作线程集合
private final Hash<Worker> workers = new HashSet<Worker>();
//用于支持awaitTermination方法的条件
private final Condition termination = mainLock.newCondition();
//曾经创建过的最大工作线程数
private int largestPoolSize;
//线程池中已完成的总任务数
private long completedTaskCount;
//线程池创建执行提交任务对应线程时采用的线程工厂
private volatile ThreadFactory threadFactory;
//线程池饱和时,拒绝策略
private volatile RejectedExecutionHandler handler;
//allowCoreThreadTimeOut为true时,无任务时情况下核心线程允许存活时间;
//线程池中超过核心线程数,那部分工作线程,无任务时情况下核心线程允许存活时间。
private volatile long keepAliveTime;
//核心工作线程是以超时的方式还是阻塞的方式尝试从workQueue队列里面获取任务,
//当以超时的方式获取时,如果在指定时间内还没有获取到任务工作线程run方法将执
//行完毕,对应工作线程被GC回收
private volatile boolean allowCoreThreadTimeOut;
//线程池中核心工作线程数
private volatile int corePoolSize;
//线程池中最大工作线程数
private volatile int maximumPoolSize;
// 线程池饱和时,默认拒绝策略 直接抛出异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
工作线程Worker类,Worker类利用AQS框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdown和shutdownNow方法的分析。涉及AQS部分暂时不深入分析,后面再写专关于AQS的文章。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
//用于执行提交任务的线程
final Thread thread;
//第一个要执行的任务,可能为null
Runnable firstTask;
//每个工作线程执行的任务数量
volatile long completedTasks;
Worker(Runnable firstTask) {
//阻止中断,直到运行runWorker方法
setState(-1);
this.firstTask = firstTask;
//利用线程工厂创建工作线程,同时让当前Worker.run方法去执行提交的任务
this.thread = getThreadFactory().newThread(this);
}
//工作线程执行任务的入口,具体执行任务代理给runWorker方法
public void run() {
runWorker(this);
}
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
//设置为排它锁
setExclusiveOwnerThread(Thread.currentThread());
//成功获取锁
return true;
}
//到同步队列中自旋
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
//省略部分源码
}
execute方法执行流程可以概括为
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数量小于核心线程数,调用addWorker方法创建工作线程。
// 提交任务command作为Worder的第一个任务执行。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//工作线程数量大于核心线程数且,线程池在运行则将任务加到队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查,如果线程池不在RUNNING,删除上一步加入队列的任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
else if (workerCountOf(recheck) == 0)
// 线程池处于SHUTDOWN状态下,没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
}
//1、非RUNNING状态拒绝新的任务
//2、队列满了启动新的线程失败,即工作线程数量大于最大线程数量(workCount > maximumPoolSize)
else if (!addWorker(command, false))
reject(command);
}
addWorker创建了ThreadPoolExecutor中用于执行提交任务的线程,这个过程同时把任务与执行任务的线程封装到Worker对象中。同时addWorker还启动了用于执行任务的线程,而具体任务的执行,则代理给了ThreadPoolExecutor的runWorkers方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池状态为非RUNNING &&
//(线程池状态为非SHUTDOWN || firstTask不为null || 队列为空) 三者中的一者
//组合一下分别是
//1、线程池状态为非RUNNING && 线程池状态为非SHUTDOWN,
//即,线程池状态为 (STOP || TIDYING || TERMINATED)
//此时线程池不在接受新的任务,通过addWorker新提交的任务会失败
//2、线程池状态为非RUNNING && firstTask不为null
//即,线程池状态为 (SHUTDOWN || STOP || TIDYING || TERMINATED) && firstTask不为null
//此时线程池不在接受新的任务,但有处理队列里的任务,通过addWorker新提交的任务会失败
//3、线程池状态为非RUNNING && 队列为空
//即,线程池状态为(SHUTDOWN || STOP || TIDYING || TERMINATED)&& 队列为空
//此时线程池不在接受新的任务,因为队列中没有任务要处理,
//所以没必要调用addWorker(null, false),创建新的线程去处理工作队列的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 线程池状态为RUNNING 或者 (线程池状态为SHUTDOWN状态,且队列中还有任务需要执行)
for (;;) {
int wc = workerCountOf(c);
//工作线程数过大最大值,或者超过核心线程数或超过最大线程数,都返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子方式设置线程池中线程数成功,则跳出重试的循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池的状态发生变化则重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建worker对象,worker对象内部
//利用线程工厂创建一个线程去执行提交的任务
//这个线程的target Runnable为 worker本身,
//最终调用worker.run执行提交的任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 重新检测线程池的状态,在获取得锁前一步,线程池可能已被终止
// 线程池状态为RUNNING 或者 (线程池状态为SHUTDOWN状态,且队列中还有任务需要执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//worker添加成功
if (workerAdded) {
//启动worker内部线程,worker内部线程的
//target Runnable为worker本身,
//将运行worker的run方法,run内部调用ThreadPoolExecutor.runWorkers方法
t.start();
workerStarted = true;
}
}
} finally {
//获取得锁前一步,线程池已被终止导致
//workerAdded失败或线程没start。
if (! workerStarted)
//会调用tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}
runWorkers方法首先会执行woker对象中的firstTask,当firstTask执行完后,会通过getTask方法循环地从workerQueue(工作队列)中获取任务去执行。当workerQueue中没有任务,getTask方法会阻塞挂起。runWorkers中在任务执行前调用了beforeExecute扩展点,在任务执行后调用了afterExecute扩展点。最后则调用processWorkerExit方法作一下清理工作。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//Worker的构造函数中通过setState(-1)抑制了线程中断,
//这里通过unlock允许中断
w.unlock();
boolean completedAbruptly = true;
try {
//首先执行worker中的firestTask,
//然后循环地从workQueue中拉取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池处于停止中,
//即线程池处于STOP、TIDYING、TERMINATED状态,
// 要确保线程被中断。如果没有确保不被中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务执行前扩展点
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后扩展点
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//将worker中workerSet中清除,统计完成任务数
//同时调用tryTerminate方法尝试终止线程池
processWorkerExit(w, completedAbruptly);
}
}
getTask方法主要用于从workQueue中取出任务交给runWorker方法去执行提交的任务,同时完了线程池中核心线程是否要allowCoreThreadTimeOut与线程池中线程数量超过maximunPoolSize时timeOut处理。核心工作线程是以超时的方式还是阻塞的方式尝试从workQueue队列里面获取任务,当以超时的方式获取时,如果在指定时间内还没有获取到任务工作线程run方法将执行完毕,对应工作线程被GC回收。
分析execute方法前先看一下ThreadPoolExecutor里面的核心变量与类。
private Runnable getTask() {
//上一次从workQueue.poll方法是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1、线程池为SHUTDOWN以上状态时且工作队列为空时,
// 此时没有任务,直接返回null
// 2、线程池为STOP以上状态时,
// 此时不用处理工作队列中的任务直接返回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//工作线程是否要在指的timeout时间内被清理
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1、wc > maximumPoolSize && (wc > 1 || workQueue.isEmpty())
//这个种情况按理不会出现??
//2、(timed && timedOut) && (wc > 1 || workQueue.isEmpty())
//影响超时处理
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//影响timeOut的方式从workQueue中获取任务,
//或者以阻塞的方式从workQueue中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit方法统计了线程池中总执行的任务数,同时尝试终止线程池。另外还加上当线程池的runState为RUNNING或SHUTDOWN时,由于核心线程数允许超时导致线程池中没有线程处理工作队列中任务的逻辑。即通过addWorker(null,false)创建一个新的线程来处理工作队列中的任务。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//runWokder执行异常时,让ctl中有效线程数量减一,
//runWokder正常执行时,getTask方法中workerCount会被减一
if (completedAbruptly)
//ctl中有效线程数量减一
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计已完成的任务数
completedTaskCount += w.completedTasks;
//从wordkerSet中去worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
int c = ctl.get();
//线程池的runState为RUNNING或SHUTDOWN时
if (runStateLessThan(c, STOP)) {
//runWorker正常执行
if (!completedAbruptly) {
//线程池最小空闲数,允许core thread超时就是0,
//否则就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//任务队列不为空,则至少要一个线程处理任务队列中的任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//线程池中有线程处理任务中的任务直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//线程池中没有线程处理任务队列中的任务,
//创建一个线程处理任务队列中的任务
addWorker(null, false);
}
}
tryTerminate方法会尝试终止线程池,如果线程池还不能终止则直接返回。如果确定可以终止的话,会调用terminated扩展点方法,执行线程池终止前想要做的工作。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 1、线程池还处于RUNNING状态直接返回
// 2、线程池状态大于TIDYING,线程池已经停止了或在停止
// 3、线程池为SHUTDOWN状态但是任务队列非空直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//线程池中还有工作线程,中断工作线程,退出
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//cas方式设置ctl状态,成功执行terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//线程池终止前执行的扩展点方法
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
//通知awaitTermination方法,继续执行
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
shutdown方法会先将线程池的状态设置为SHUTDOWN,然后向线程池中的所有线程发出中断信号。最后会调用tryTermiate方法尝试终止线程。处理SHUTDOWN状态的线程池,不接受新的任务,但会执行工作队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//确保调用者用权限关闭线程池
checkShutdownAccess();
//自旋的方法设置线程池的状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
// tryTerminate方法中会保证队列中剩余的任务得到执行。
interruptIdleWorkers();
// hook for ScheduledThreadPoolExecutor
onShutdown();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
shutdownNow方法会先将线程池的状态设置为SHUTDOWN,然后向线程池中的所有线程发出中断信号。最后会调用tryTermiate方法尝试终止线程。处于STOP状态的线程池,不接受新的任务,同时由于调用了drainQueue使得workQueue中任务全被删除,workQueue中的任务不被执行。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//确保调用者用权限关闭线程池
checkShutdownAccess();
//自旋的方法设置线程池的状态为SHUTDOWN
advanceRunState(STOP);
//中断所有线程池中所有线程
interruptWorkers();
//获取未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks;
}
shutDown方法与shutDownNow方法,最主要的区别在于shutDown调用的是
interruptIdleWorkers()方法,而shutDownNow调用的是interruptWorkers()方法。
interruptIdleWorkers方法只会中断空闲的线程。这点是通过w.tryLock实现的,由于runWorker方法中在worker在执行任务前会先调用worker的lock方法。
所以tryLock方法成功时,当前的worker一定处于空闲状态。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//判断worker持有的线程是否已被中断
//且能通过tryLock能获取到锁。
//由于runWorker方法中执行任务时会先lock,
//如果能tryLock说线程不在执行任务,
//保证了中断的肯定是空闲的线程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
}
finally {
mainLock.unlock();
}
}
ThreadPoolExecutor之interruptWorkers方法
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//调用worker的interruptIfStarted方法中断
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
Worker之interruptIfStarted方法
void interruptIfStarted() {
Thread t;
//state为0表示worker unLock,1表示worker lock,
//不管worker是在runWorker还是idle,全部进行中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
prestartCoreThread方法首先判断当前线程池中的线程数是否小于核心线程数,如果小于则调用addWorker创建一个工作线程。该工作线程等待处理后面将要提交的任务
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
ThreadPoolExecutor之prestartAllCoreThreads方法
prestartAllCoreThreads与prestartCoreThread方法类似
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
这也是之前ThreadPoolExecutor更加直观的整体运行图中prestartAllCoreThreads与prestartCoreThread指向线程池中核心线程执行有者Worker的那部分。