Java并发 ThreadPoolExecutor源码分析

1. ThreadPoolExecutor相关框架图

a. Executors提供newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor创建创建线程池。
b. RunnableAdapterExecutors的内部类,提供Runnable转接为Callable的方法;
c. DefaultThreadFactoryExecutors内部的线程工厂类, PrivilegedThreadFactory重写其newThread方法;
d. Worker重写AQS获取锁的方法,并包装业务逻辑; ThreadPoolExecutor::workers保存所有的工作业务, 当其满时缓存到workQueue;

2.1 线程池中断策略

ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出
RejectedExecutionException异常。默认
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

2.2 自定义中断策略

class SelfRejected extends RejectedExecutionHandler

2.3.1 线程池状态

//接收新任务,并且执行缓存任务队列中的任务
private static final int RUNNING    = -1

// 不在接收新的任务,但是会执行缓存中的任务
private static final int SHUTDOWN   =  0

// 不接收新的任务,不执行缓存中的任务,中断正在运行的任务
private static final int STOP       =  1

// 所有任务已经终止,workCount = 0;
private static final int TIDYING    =  2

// terminated 方法调用完成
private static final int TERMINATED =  3

2.3.2 线程池状态切换

RUNNING -> SHUTDOWN
 *    On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
 *    On invocation of shutdownNow() // shutdownNow
SHUTDOWN -> TIDYING
 *    When both queue and pool are empty // 缓存队列、线程池均空闲
STOP -> TIDYING
 *    When pool is empty
TIDYING -> TERMINATED
 *    When the terminated() hook method has completed

ctl
    The main pool control state, ctl, is an atomic integer packing two conceptual fields
workerCount
    indicating the effective number of threads // 有效线程数量
runState
    indicating whether running, shutting down etc // 运行状态

2.3.3 其他成员

// 缓存任务阻塞队列
private final BlockingQueue<Runnable> workQueue;

// 线程池主锁
private final ReentrantLock mainLock = new ReentrantLock();

// 工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();

// mainLock上的终止条件量,用于支持awaitTermination
private final Condition termination = mainLock.newCondition();

// 记录曾经创建的最大线程数
private int largestPoolSize;

// 已经完成任务数
private long completedTaskCount;

private volatile ThreadFactory threadFactory;

// 设置默认任务拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

3.1 Executors::newFixedThreadPool 创建固定大小线程池

newFixedThreadPool创建固定大小线程池,无非核心线程,超出的线程保存在LinkedBlockingQueue中等待;

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize == maximumPoolSize, 无非核心线程
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

3.2 ThreadPoolExecutor::execute

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 */
public void execute(Runnable command) {
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 
     * 当工作者线程小于核心线程,则new thread, runnable作为其第一个task,
     * addWorker将检查线程运行时状态 和 工作者总数,
     * 目的是: 当不允许添加线程时,防止假唤醒;
     * 
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 
     * 当核心线程已满,线程池处于运行时状态,则向缓存队列workQueue添加;
     *    此时,需要double-check
     *    因为,当调用此方法后线程池可能被shutdown、 线程died
     * 如果,工作者线程为0,添加一个null task的worker
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 此时,缓存队列已满,则创建一个线程,添加到works
     */
    int c = ctl.get();
    // 1. 如果工作线程数小于核心线程数,则添加新的线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // 3.3.1 
            return; // 添加成功则返回
        c = ctl.get(); // 否则获取线程池状态
    }
    // 2. 工作线程数大于等于核心线程数,则将任务放入缓存任务队列
    // 如果线程池正在运行,而且成功将任务插入缓存任务队列两个条件
    if (isRunning(c) && workQueue.offer(command)) { // 第一次检查
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) // 第二次检查
            reject(command); // 如果不处于运行状态,则将任务从任务缓存队列移除
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); //启动无初始任务的非核心线程 // 3.3.1
    }
    // 3. 任务入队失败,说明任务缓存任务队列已满,尝试添加新的线程处理
    else if (!addWorker(command, false)) // 3.3.1
        reject(command);
}

3.3.1 ThreadPoolExecutor::addWorker

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 对 runState进行循环获取和判断,如果不满足添加条件则返回false
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); // 线程池状态

            // Check if queue empty only if necessary.
            // 对线程池状态进行判断,是否适合添加新的线程
            if (rs >= SHUTDOWN && // 线程池关闭
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 设置成功,则跳出最外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果workerCount没有设置成功,而且runState发生变化,
                // 则继续最外层的循环,对runState重新获取和判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 3.4
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 获取锁后,对runState进行再次检查
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // workers是HashSet<Worker>, 包含池中所有worker, 只有当持有才可mainlock访问。
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 3.4 如果添加成功,则启动线程,执行worker::run
                    workerStarted = true;
                }
            }
        } finally { // 因为中途发生异常而没有让添加的线程启动,则回滚
            if (! workerStarted)
                addWorkerFailed(w); // 3.7 
        }
        return workerStarted;
    }
  1. 在外循环对运行状态进行判断,内循环通过CAS机制对workerCount进行增加,当设置成功,则跳出外循环,否则进行进行内循环重试;
  2. 外循环之后,获取全局锁,再次对运行状态进行判断,符合条件则添加新的工作线程,并启动工作线程,如果在最后对添加线程没有开始运行(可能发生内存溢出,操作系统无法分配线程等等)则对添加操作进行回滚,移除之前添加的线程;

3.3.2 线程池添加任务流程

线程池添加任务流程

核心线程 -> 工作队列 -> 线程池 -> 饱和策略

3.4 worker 线程封装类

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    Worker(Runnable firstTask) { // 5. worker框架参考
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 新建thread to save runable
    }

    /** Delegates main run loop to outer runWorker. */
    public void run() {
        runWorker(this); // 3.5 工作线程主函数
    }

    // 同步锁,重写AQS中相关锁的方法
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
}

3.5 工作线程主函数 ThreadPoolExecutor::runWorker

/**
* Main worker run loop.  Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters.  Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler).  Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 获取任务,如果没有任务可以获取,则此循环终止,
        // 这个工作线程将结束工作,等待被清理前的登记工作
        while (task != null || (task = getTask()) != null) { // 3.8 getTask获取任务
            w.lock(); // 执行任务之前获取工作线程锁
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt

            // 如果线程池关闭,则确保线程被中断
            // 如果线程池没有关闭,则确保线程不被中断
            // 这就要求在第二种情况下,进行重新检查,处理shutdownNow正在运行同时清除中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 执行前业务处理
                try {
                    task.run(); // 业务处理
                } catch (RuntimeException x) {
                    ...
                } finally {
                    afterExecute(task, thrown);  // 执行后业务处理
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 登记信息,移除结束线程,然后根据情况添加新的线程等
        processWorkerExit(w, completedAbruptly); // 3.6
    }
}

3.6 ThreadPoolExecutor::processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w); // 释放worker
    } finally {
        mainLock.unlock();
    }

    tryTerminate(); // 终止线程
}

3.7 ThreadPoolExecutor::addWorkerFailed

/**
 * Rolls back the worker thread creation.
 * - removes worker from workers, if present
 * - decrements worker count
 * - rechecks for termination, in case the existence of this
 *   worker was holding up termination
 */
private void addWorkerFailed(Worker w) {
    // 对线程回滚需要获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount(); // 减少工作线程计数
        tryTerminate(); // 尝试终止线程
    } finally {
        mainLock.unlock();
    }
}

3.8 ThreadPoolExecutor::getTask

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of: 
     * //退出业务处理
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     */
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); //减少workerCount数量
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 包含对超时的判断,如果发生超时,则说明该worker已经空闲了
            // keepAliveTime时间,则应该返回null,这样会使工作线程正常结束,
            // 并被移除
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take(); // workQueue中keepAliveTime内获取任务
                // 如果在keepAliveTime时间内获取到任务则返回
                if (r != null)
                    return r;
                timedOut = true; // 否则将超时标志设置为true
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

4.1 Executors::newCachedThreadPool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 核心线程为0
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(), // SynchronousQueue
                                  threadFactory);
}

4.2 Executors::newSingleThreadExecutor

FinalizableDelegatedExecutorService 内部封装ThreadPoolExecutor,且只有一个核心线程;

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1/*corePoolSize*/, 1/*maximumPoolSize*/, // corePoolSize == maximumPoolSize == 1
                                0L/*keepAliveTime*/, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

4.3 Executors::FinalizableDelegatedExecutorService

private static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

FinalizableDelegatedExecutorService继承自DelegatedExecutorService,并实现finalize方法;
DelegatedExecutorService 封装了ThreadPoolExecutor

5. Worker 相关框架

Worker 相关框架

a. AbstractQueuedSynchronizer 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架;
b. Node是AbstractQueuedSynchronizer的内部类,Node保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点;
c. Worker 实现AbstractQueuedSynchronizeracquire相关方法;

AQS 中Node节点

AQS::Node

参考:

AbstractQueuedSynchronizer的介绍和原理分析
Java线程池分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,064评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,606评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,011评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,550评论 1 269
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,465评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,919评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,428评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,075评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,208评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,185评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,191评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,914评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,482评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,585评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,825评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,194评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,703评论 2 339

推荐阅读更多精彩内容