性能文章>JUC之 FutureTask 源码与工作原理分析>

JUC之 FutureTask 源码与工作原理分析原创

10279417

JDK1.5 引入了Future模式,Future代表了一个异步任务的执行结果。Future模式可以理解成:主线程将待执行的任务提交给子线程执行后,可以先获取任务结果的持有者Future。然后主线程可以去执行其他的任务。等待到要关注之前任务的执行结果时,再从Future中获取。下面是用户注册的场景。

image.png

同步实现时,注册接口响应时间150ms= 用户信息保存50ms + 发送短信50ms + 发送邮件50ms

异步实现时,注册接口响应时间100ms= 用户信息保存50ms + max(发送短信50ms, 发送邮件50ms)

Future 接口核心方法

/*尝试取消当前执行的任务*/
boolean cancel(boolean mayInterruptIfRunning);
/*判断当前执行的任务是否被取消*/
boolean isCancelled();
/*判断当前执行的任务是否完成,正常结束、发生异常或任务被取消都认为任务完成*/
boolean isDone();
/*获取当前执行任务对应的结果,将阻塞等待任务执行完*/
V get() throws InterruptedException, ExecutionException;
/*等待一段时间,获取当前执行任务对应的结果,将阻塞等待任务执行完,如果对应时间还未获取结果,
 * 抛出TimeoutException
 */
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

后面所有分析全部基于JDK8

在JDK中Future接口的实现类为FutureTask,其UML图如下。

image.png

FutureTask同时实现了Runnable与Future的功能。实现Runnable主要是为了让Executor可以执行,实现Future为了能够持有执行结果的引用

FutureTask的成员变量如下:

/*当前任务的执行状态*/
private volatile int state;
/*当前要执行的任务,执行完后将被设置为null*/
private Callable<V> callable;
/*当前任务的执行结果,当发生异常或被取消时为对应异常,非volatile,由state状态保证线程安全*/
private Object outcome;
/*执行当callable任务的线程引用,为当前线程通过CAS方式原子设置*/
private volatile Thread runner;
/*Treiber椎,用于保存由于调用Future.get方法而阻塞的线程*/
private volatile WaitNode waiters;

先来看一下用于维护当前任务执行状态的state成员变量。

FutureTask内部定义了,任务执行的7种状态。

//任务的初始初始化状态
private static final int NEW          = 0;
//执行结果设置中时,的中间状态
private static final int COMPLETING   = 1;
//任务正常执行完成的最终状态
private static final int NORMAL       = 2;
//任务执行出错的最终状态
private static final int EXCEPTIONAL  = 3;
//调用Future.cancell取消任务的最终状态
private static final int CANCELLED    = 4;
//执行任务的线程被中断的中间状态
private static final int INTERRUPTING = 5;
//执行任务的线程被中断的最终状态
private static final int INTERRUPTED  = 6;

image.png

上图展示了,Future任务状态的扭转图。new状态时通过调用set()方法、setException()方法可以使状态最终分别转变为NORMAL与EXCEPTIONAL; new状态时通过调用cancel(flase)方法、cancel(true)方法可以可以使状态最终分别转变为CANCELLED与INTERRUPTED。

再看一下用于保存由于调用Future.get方法而阻塞的线程的Treiber椎引用成变量waiters。

static final class WaitNode {
     volatile Thread thread;
     volatile WaitNode next;
     WaitNode() { thread = Thread.currentThread(); }
}

每当一个线程调用Future.get去获取任务的执行结果时,如果当前任务还没有执行结束、还没有被取消或者执行中未抛出异常。将产生一个新的WaitNode类型的节点,该节点持有调用Future.get方法的线程的引用,放入到Treiber椎中。FutureTask成变量waiters更新为最新的WaitNode节点。如下图。

image.png

介绍了,FutureTask中最重要的两个成量变state与waiters。现在开始分析FutureTask的源码与工作原理。先从FutureTask的get方法入手。

/** 
 * @throws CancellationException {@inheritDoc} 
 */
 public V get() throws InterruptedException, ExecutionException {
      int s = state;
      //任务执行状态为NEW或COMPLETING时,等待任务执行完成
      if (s <= COMPLETING)
          //等待任务执行完成,具体的等待通过Treiber椎实现
          s = awaitDone(false, 0L);
      /*不是NEW或COMPLETING时,可以任务执行的获取结果,       
       *当调用cancel方法将抛出CancellationException,       
       *当任务执行中出现异常时将抛出ExecutionException       
       */
      return report(s);
 }

FutureTask awaitDone方法:

    /**     
     * Awaits completion or aborts on interrupt or timeout.     
     *     * @param timed true if use timed waits     
     * @param nanos time to wait, if timed     
     * @return state upon completion     
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            /*任务已执行完,包括NORMAL(任务正常执行完), EXCEPTIONAL(任务执行出现异常),              
             *CANCELLED(任务被取消执行不中断), INTERRUPTING(任务被取消执行中断),              
             *INTERRUPTED(任务被取消执行中断)             
             */
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //任务还在执行中
            else if (s == COMPLETING) // cannot time out yet
                //向操作系统发出让出当前线程CUP的调度执行信号
                Thread.yield();
            else if (q == null)
                //状态为NEW时,将当前调用get方法的线程构造成WaitNode做准备下一步放入Treiber stack
                q = new WaitNode();
            else if (!queued)
                //以原子的方式将调用了get方法的线程构造好的WaitNode放入Treiber stack
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                //响应超时的调用get方法
                nanos = deadline - System.nanoTime();
                //已超时返回
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //将当前线程挂起,直到调用LockSupport.unpark当前线程或是已超时
                LockSupport.parkNanos(this, nanos);
            }
            else
                //将当前线程挂起,直到调用LockSupport.unpark当前线程
                LockSupport.park(this);
        }
    }

从上面的分析中可以看出Future中任务没有执行完(包括正常执行、执行中抛出现异常、执行任务被取消)时,调用get方法的线程被LockSupport.park方法挂起,操作系统将不会对这个线程进行调度,当前线程被阻塞。而这个阻塞的结束是依靠调用LockSupport.unpark方法。LockSupport.unpark方法只有在任务正常执行完、执行中抛出现异常、执行任务被取消才会被调用。先不看具体何时在什么地方LockSupport.unpark方法被调用。先看看report方法的源码。

    /**     
     * Returns result or throws exception for completed task.     
     *     * @param s completed state value     
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        /*最终任务执行的结果,如果任务正常执行完成为任务执行结果         
         *如果任务执行被取消为null, 如果任务执行中出现异常为对应的Throwable对象         
         */
        Object x = outcome;
        //任务正常执行完成
        if (s == NORMAL)
            return (V)x;
        /*任务执行被取消,调用cancel(false)产生的状态为CANCELLED,         *调用cancel(true)产生的状态为INTERRUPTED         */
        if (s >= CANCELLED)
            //抛出任务被取消的异常
            throw new CancellationException();
        //任务执行中出现异常抛出,执行中的异常
        throw new ExecutionException((Throwable)x);
    }

上面已将FutureTask的get方法有关的源码分析了一遍。但任务对应的结果outcome是何时在什么被设置呢?下面分析一下这部分,即任务的执行结果outcome如何被设置的,何时被设置的。FutureTask中有一个set方法源码如下。

    /**     
     * Sets the result of this future to the given value unless     
     * this future has already been set or has been cancelled.     
     *     
     * <p>This method is invoked internally by the {@link #run} method     
     * upon successful completion of the computation.     
     *     
     * @param v the value     
     */
    protected void set(V v) {
        //以原子的方式设置任务的执行中间状态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //设置任务执行结果
            outcome = v;
            //设置任务为正常执行完
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //从Treiber stack删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
            finishCompletion();
        }
    }

finishCompletion源码如下。

 private void finishCompletion() {
        // assert state > COMPLETING;
        //遍历Treiber stack 删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒挂起的线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //任务执行完后的扩展点,默认实现为空。可以利用这个扩展点实现回调
        done();
        callable = null;        // to reduce footprint
    }

跟踪调用链会发现run方法里面调用了set方法,由于FutureTask实现了Runnable接口,实现了run方法。run源码如下。

    public void run() {
        /*任务执行的状态为非NEW时,         
         *UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))         
         *将不执行直接退出。         *任务执行的状态为NEW时,         
         *执行UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))         
         *将当前线程设置为callable的runner。         
         *多个线程并发调用run时,只有一个线程会执行,其他线程会直接退出,因为只有一个线程的         
         *UNSAFE.compareAndSwapObject将为true。         
         */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            //待执行的任务不空且任务还未执行才执行任务
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务具体的方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //设置异常
                    setException(ex);
                }
                if (ran)
                    //成功执行设置任务对应结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

前面曾经提到FutureTask实现Runnable主要是为了让Executor可以执行,实现Future为了能够持有执行结果的引用

ExecutorService扩展了Executor提供了三个重载的submit方法,从而让线程池支持Future模式。

//提交Callable类型的任务
<T> Future<T> submit(Callable<T> task)
//提交Runnable类型的任务,并以result作为任务执行完后的结果
<T> Future<T> submit(Runnable task, T result);
//提交Runnable类型的任务,以null作为任务执行完后的结果
Future<?> submit(Runnable task);

如果提交的任务类型是Runnable在AbstractExecutorService类中,Runnable类型的任务将适配为Callable类型的任务。源码如下,不做具体分析。后面分析线程池源码时会分析。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    //将提交runnable类型的任务适配为Callable类型的任务
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

对于任务的调用记住以下两点便可:

  1. 如果submit给线程池的任务类型为Callable,则线程池内部执行该任务的线程的run方法,会调用FutureTask的run方法,而FutureTask的run方法又会调用提交任务的Callable的call方法。

  2. 如果submit给线程池的任务类型为Runnable,则线程池内部执行该任务的线程的run方法,会调用FutureTask的run方法,而FutureTask的run方法又会调用提交任务适配后的Callable的call方法,而Callable的call方法内部调用Runnable的run方法。

最后看一下FutureTask的cancel方法

    //mayInterruptIfRunning 运行中任务的是否可以被中断
    public boolean cancel(boolean mayInterruptIfRunning) {
        //状态为NEW的任务,才有可能cancel成功
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        /* 一旦任务的状态不是NEW下面代码将不执行*/
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        //中断当前执行任务的线程
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //从Treiber stack删除所有阻塞等待的线程,同时将这些挂起的线程唤醒
            finishCompletion();
        }
        return true;
    }

上面对Future做了这整体分析。回顾几个重要的知识点:

  1. 当提交任务没有执行完时,Future的内部通过Treiber椎与LockSport.park将调用Future.get方法的线程构成一个WaitNode加入到Treiber椎中挂起,当任务执行完成后(包括正常执行、执行中抛出现异常、执行任务被取消),将遍历Treiber椎中的结点调用LockSport.unpark唤醒所有挂起的线程让其返回任务的执行结果。

  2. FutureTask类提供了done方法做为扩展点,提交任务成功执行后(包括正常执行、执行中抛出现异常、执行任务被取消)done方法将被调用。可以利用这个扩展点实现回调,不用调用isDone与get组合去轮询任务的执行结果,而在任务执行回后会主动调用之前的回调方法。(Google的Guava库利用这个扩展点实现了ListenableFuture,同时Spring-core中也借鉴了Guava的ListenableFuture实现了ListenableFuture。)

  3. 提交的任务,正常执行完、执行中抛出现异常、执行任务被取消,都视为任务执行完成。

  4. 调用线程池的submit方法的任务最终将封装成FutureTask,提交给线程池去执行。对于提交类型为Runnable的任务其会被先适配为Callable类型的任务。

点赞收藏
分类:标签:
叶易_公众号洞悉源码
请先登录,查看4条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步
17
4