


以前代码里面 Executors使用的多一些,ThreadPoolExecutor也有使用,但是对其原理和代码了解的却不多。最近通过看源码,了解了一下。



     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;


  • corePoolSize 线程池核心线程数大小,初始化是核心线程数也是0,除非先调用prestartCoreThread或者prestartAllCoreThreads先创建核心线程;在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁
  • maximumPoolSize 线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务
  • keepAliveTime 线程池空闲时,线程存活的时间
  • TimeUnit 时间单位
  • ThreadFactory 线程工厂
  • BlockingQueue 任务队列
  • RejectedExecutionHandler 任务拒绝策略;负责处理当线程饱后、线程池正在关闭时的新提交任务;


  • (1)、CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
  • (2)、AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
  • (3)、DiscardPolicy,直接抛弃任务,不做任何处理;
  • (4)、DiscardOldestPolicy,去除任务队列中的第一个任务,重新提交;


ThreadPoolExecutor有如下 5种状态:

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;


private static final int COUNT_BITS = Integer.SIZE - 3;  //Integer.SIZE=32








private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //使用AtomicInteger 当然是为了保证多线程同步问题

ctl 可以理解为control(控制),初始值为线程数0,状态RUNNING;


线程池执行一个线程有两种方法: execute 和submit。下面看一下execute。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
            c = ctl.get();
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        else if (!addWorker(command, false))


  • 1、调用ThreadPoolExecutor的execute提交线程,首先检查CorePool,如果CorePool内的线程小于CorePoolSize,新创建线程执行任务。
  • 2、如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
  • 3、如果不能加入BlockingQueue,在小于MaxPoolSize的情况下创建线程执行任务。
  • 4、如果线程数大于等于MaxPoolSize,那么执行拒绝策略。


     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
    private boolean addWorker(Runnable firstTask, boolean core) {
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                } finally {
                if (workerAdded) {
                    workerStarted = true;
        } finally {
            if (! workerStarted)
        return workerStarted;


 if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;


    if (rs >= SHUTDOWN &&
                (rs == !SHUTDOWN ||
                   firstTask == !null ||
                        return false;



  • 如果rs>=SHUTDOWN,同时不等于SHUTDOWN,即为SHUTDOWN以上的状态,那么不接受新线程。
  • 如果rs>=SHUTDOWN,同时等于SHUTDOWN(其实就是SHUTDOWN),同时firstTask!=null,那么拒绝新线程;
  • 如果rs>=SHUTDOWN,同时等于SHUTDOWN(其实就是SHUTDOWN),同时firstTask ==null,那么可能是新增加线程消耗Queue中的线程。但是同时还要检测workQueue是否isEmpty(),如果为Empty,那么队列已空,不需要增加消耗线程,如果队列没有空那么运行增加first=null的Worker。



SHUTDOWN状态时,是不允许向workQueue中增加线程的,isRunning(c) && workQueue.offer(command) 每次在offer之前都要做状态检测,也就是线程池状态变为>=SHUTDOWN时不允许新线程进入线程池了。


