ThreadPoolExecutor

在一个应用程序中,我们需要多次使用线程,也就意味着需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,就提出了线程池的概念

构造函数

    public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                              int maximumPoolSize, // 最大线程数(包含核心线程数量)
                              long keepAliveTime, // 大于核心线程数时 淘汰线程的时间
                              TimeUnit unit, // 时间单位
                              BlockingQueue<Runnable> workQueue, // 传输和保存等然任务的阻塞队列
                              ThreadFactory threadFactory, // 用于创建新线程
                              RejectedExecutionHandler handler) { // 线程的淘汰策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0) // 这里会对线程数量以及等待时间做一些基本校验
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池中的状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 初始值 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; // 111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000
    private static final int STOP       =  1 << COUNT_BITS; // 001
    private static final int TIDYING    =  2 << COUNT_BITS; // 010
    private static final int TERMINATED =  3 << COUNT_BITS; // 011

可以看到线程池使用一个AtomicInteger来表示当前线程的情况,其中高三位表示当前线程池的状态,而低29位表示线程池的
需要注意的是 -1的二进制表示 是 1的二进制 取反在加1:

1:   0000 0000 0000 0000 0000 0000 0000 0001
取反: 1111 1111 1111 1111 1111 1111 1111 1110
加1: 1111 1111 1111 1111 1111 1111 1111 1111

线程状态流转:


image.png

运行状态 状态描述
RUNNING 接收新任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 不接收新任务,但是却可以继续处理阻塞队列中的任务。
STOP 不接收新任务,同时也不处理队列任务,并且中断正在进行的任务。
TIDYING 所有任务都已终止,workercount(有效线程数)为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。
TERMINATED terminated() 方法调用完成后变成此状态。

execute

execute 是我们提交一个线程的核心方法, 我们以该方法为入口看看线程池的实现:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // 当前池中线程数量小于corePoolSize,基于传入的command创建新的线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) // 创建新线程
                return;
            c = ctl.get();
        }
        // 大于corePoolSize的话 则入队(阻塞队列的offer方法是不会阻塞的 插入失败直接返回false)
        if (isRunning(c) && workQueue.offer(command)) { 
            // 当前线程池是running状态 并且入队成功
            int recheck = ctl.get();
            // recheck下如果不是running状态 删除掉这个命令 
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0) // 池中没有线程了 创建一个
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 尝试创建非核心线程 
            reject(command); // 创建失败 则执行reject 策略
    }

addWorker

首先我们来看下 worker是个什么东西:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /**
         * Thread this worker is running in.  Null if factory fails.
         */
        final Thread thread;
        /**
         * Initial task to run.  Possibly null.
         */
        Runnable firstTask;
        /**
         * Per-thread task counter
         */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         *
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

    }

可以看出, worker 本身继承Runnable,并且也是AQS的子类,所以worker本身也可以完成AQS的一些阻塞操作,这里自己继承AQS而不是使用ReentrangLock的目的就是防止重入。并且,在worker的构造方法中,会使用ThreadFactory创建一个新的线程,传入的Runnable就是自己,所以说线程池中跑的就是一个个Worker,然后通过firstTask保存用户传进来的任务,然后通过run方法,调用到runWorker,从而执行用户任务, 关于runWorker,后面解析。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); // 获取线程池当前状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && 
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false; 

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

整个方法可以拆分成两部分 :
第一部分:

       retry: // 标识一个循环 如果有多层嵌套循环, 可以直接跳到retry标识的某个循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); // 获取线程池当前状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && 
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false; 

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

在外层的for循环里,有一个判断:

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
   return false;

转换为 :
rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())

这个判断的意思是:

  1. rs > shoutdown , 也就是STOP,TIDYING,TERMINATED状态直接返回失败
  2. rs >= shutdown && firstTask != null,线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且worker的首个任务不为空时,添加工作线程失败,不接受新任务
  3. rs >= shutdown && workQueue.isEmpty:线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且阻塞队列为空时,添加工作线程失败,不接受新任务。
    所以,最外层的 for 循环是不断的校验当前的线程池状态是否能接受新任务,如果校验通过了之后才能继续往下运行。
    首先做判断:
  if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

如果已经大于最大容量 或 根据本次操作类型 判断是否已经大于核心线程数 或 最大线程数 如果已经超过 则返回失败。

 if (compareAndIncrementWorkerCount(c))
                    break retry;

这里会增加ctl中记录的线程数,如果增加成功,则跳出外层循环,执行下面要将的第二部分。

c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
    continue retry;

上面增加ctl线程数量失败的话,再次检查当前状态 ,从外层循环再次执行。

当上面增加ctl中记录的线程个数后,就该执行第二部分了:

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建一个work对象
        final Thread t = w.thread; // 拿到work对象中的线程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 这边会操作workers(HashSet) 所以要加锁
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // 获取线程池运行状态

                if (rs < SHUTDOWN || // RUNNING状态
                        (rs == SHUTDOWN && firstTask == null)) { // 这里为了传入任务为空的情况也可以创建
                    if (t.isAlive()) // 如果线程已经被启动了 直接抛出异常
                        throw new IllegalThreadStateException();
                    workers.add(w); // 将新创建的worker添加到workersSet中
                    int s = workers.size();
                    if (s > largestPoolSize) // 维护下largestPoolSize 这个标识
                        largestPoolSize = s;
                    workerAdded = true; // 标识位设置为true
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {// 如果本次添加worker成功
                t.start(); // 让worker跑起来
                workerStarted = true;  // 设置启动成功的标志位
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted; // 返回启动标志位

这边还有个addWorkerFailed方法:

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w); // 从set 中删除该worker
            decrementWorkerCount(); // cas 将ctl中记录的线程数减1
            tryTerminate(); // 判断是否需要终止整个线程池 需要的话 就终止
        } finally {
            mainLock.unlock();
        }
    }

runWorker

这里我们接着上面的Worker ,看下runWroker干了什么,也就是Worker是如何从队列中获取任务执行的(执行worker.thread.start()其实就是执行了这里):

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // worker 一开始被启动的时候 会传入这个任务
        w.firstTask = null;
        //由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为  0,允许线程中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // task为空 或者 从队列中获取任务为空
            // 注意 这里getTask就是从队列中获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池运行状态是stopping, 确保线程是中断状态;
                // 如果不是stopping, 确保线程是非中断状态. 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); // 钩子方法 方便子类做一些标识
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行任务的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++; // 维护下总完成任务标识
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally
             // 如果能执行到这里 就说明 该worker是时候被废弃了
            processWorkerExit(w, completedAbruptly);
        }
    }

    // 从队列中获取任务
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) { // 循环拿任务
            int c = ctl.get();
            int rs = runStateOf(c); // 得到当前线程池状态

            // Check if queue empty only if necessary.
            // 针对当前线程池状态判断是否要直接停止执行队列中的任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); // 将worker数量减1
                return null;
            }

            int wc = workerCountOf(c); // 当前线程池中worker总数量

            // Are workers subject to culling?
            // 如果设置了allowCoreThreadTimeOut = true 那么核心线程也是可以被淘汰的
            // timed 用于表示是否需要表示是否需要校验时间(是否已经大于核心线程数)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) // 当前worker数量大于最大线程数量 或者 已经超时
                && (wc > 1 || workQueue.isEmpty())) { // 线程数量大于1 并且队列已经空了
                if (compareAndDecrementWorkerCount(c)) // 线程数量减1 成功 返回空
                    return null;
                continue; // 否则继续循环
            }

            try {
                // 如果需要使用时间判断 啧使用poll 否则使用take 
                // 对列为空的话 take会无止境阻塞 而poll等待时间后 直接返回空
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null) // 拿到任务的话直接返回 
                    return r;
                timedOut = true; // 否则下次循环 
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

submit

然后来看下submit方法:


    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

可以看出 不管是哪种,都是通过RunnableFuture 将其包裹起来,然后通过execute执行,最后将Future对象返回出去。

动态修改核心线程数以及最大线程数

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers(); // 当前已存在的线程数大于新设置的值的话
        else if (delta > 0) { // 增大核心线程数
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            // 防止任务很多 ,这里提前创建一些worker 并start
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) { 
                if (workQueue.isEmpty()) // 如果等待队列为空 则停止添加worker
                    break;
            }
        }
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize) // 将最大线程数减小的话 
            interruptIdleWorkers();
    }

    private void interruptIdleWorkers() { // 中断闲置线程
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 线程不是中断状态 并且 拿到了worker的锁
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

这里需要注意的是在worker 中含有任务并且在运行的时候,会上锁(参看runWorker方法),也就是说 ,只有目前没有执行任务的worker ,这里才能拿到锁, 进而设置线程已中断。

参考:
https://www.cnblogs.com/jajian/p/11442929.html
https://mp.weixin.qq.com/s/hduWrrK4B8x8Z3C7RnIhjw
https://mp.weixin.qq.com/s/FJQ5MhB1kMp8lP1NA6q4Vg
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容