ThreadPoolExecutor(3) —— 干活的人 Worker

一、前言

前一篇文章,大体说明了一下线程池如何添加一个新的Worker去执行任务。本篇来详细分析 Worker 本身。

二、Worker 的结构

2.1 Worker 整体结构

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Worker所绑定的执行任务的线程. */
        final Thread thread;
        /** 初始化时需要执行的任务,有可能为空 */
        Runnable firstTask;
        /** 完成任务数 */
        volatile long completedTasks;

        /**
         * 通过给定的任务(有可能为空)来创建初始化,初始化时会创建一条线程进行绑定
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 实现Runnable接口的run方法  */
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

可以看到 Worker 继承了AbstractQueuedSynchronizer,并实现了Runnable。

  • 继承了 AQS,说明 Worker 本身是个锁,而且在tryAcquire以及其他对AQS方法的实现,都说明了它不支持重入。因为参数都写死为1,如果是重入功能的锁的话,会支持累加(此处可能说的不详细,如果不明白可以参考 AQS 系列文章 ReentrantLock(一) —— AQS简介)。
  • 实现 Runnable,说明 Worker 本身是个可执行的任务类,它与它自身的thread 属性相互绑定(this.thread = getThreadFactory().newThread(this))

2.2 runWorker 方法

runWorker 方法是ThreadPoolExecutor的方法。

/**
 * 执行 Worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 调用unlock()是为了让外部可以中断
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环)
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 帮助gc
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w, completedAbruptly);
    }
}

总结一下runWorker方法的执行过程:

  1. while循环中,不断地通过getTask()方法从workerQueue中获取任务
  2. 如果线程池正在停止,则中断线程。否则调用
  3. 调用task.run()执行任务;
  4. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);

这个流程图非常经典:


Worker 的执行

2.3 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 这个变量用于表示是否进入过自旋。
    // 1. 如果没有进入过,该值为false
    // 2. 进入过,该值为true
    // 只有进入过自旋,worker的数量才需要减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 通过全局锁的方式移除worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试终止线程池
    tryTerminate();

    int c = ctl.get();
    // 如果线程池状态为`SHUTDOWN`或`RUNNING`,
    // 则通过调用`addWorker()`来创建线程,辅助完成对阻塞队列中任务的处理。
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

2.4 Worker 是如何被启动的

在上一篇 ThreadPoolExecutor(二) —— 线程池源码分析 中,ThreadPoolExecutor 成功将 Worker 添加到集合中后,调用的是 Worker 中 thread 的 start 方法(t.start())。我们知道 Thread 的 start 方法是会启动一个内存中的线程单元,并执行 run 方法:

public
class Thread implements Runnable {
    //此处的 target 将会是 Worker 实例
    public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
        init(group, target, name, stackSize);
    }

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
}

而在 Thread 执行 run 方法时,实际上调用的是它自身的 target 的run方法,此处的 target 就是与 Thread 绑定的 Worker 实例。我们再看一下两者绑定的过程。
在 Worker 初始化时,会通过 ThreadFactory 创建一个 Thread 实例:

   Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       // 在创建 Thread 实例并赋值时,Worker 将自己作为参数传入线程工厂的方法内
       this.thread = getThreadFactory().newThread(this);
   }

此处 getThreadFactory 方法,返回的实际实例时 DefaultThreadFactory:

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

此处执行了它的 newThread 方法,其中 Runnable 对象 r,就是 Worker 对象,在此处将 Worker 对象传入 Thread 的构造方法中,与 Thread 完成绑定。

所以,在线程池直接调用 Thread 的 start 方法,可以直接启动 Worker,执行 Worker 的 run 方法。

三、线程是如何在线程池中运作的

1、在线程池成功的 addWorker ,并且成功启动了 Worker 对应的 Thread以后,这个Thread就开始运作,运作的第一个任务,是 Worker 对象中的firstTask。
2、当 firstTask运作结束,会通过 getTask() 方法从队列中获取任务。在这里获取到的 task,无需与当前的 Thread 对象有什么绑定关系,只需要在当前 Thread 中执行这个 task 的 run 方法即可。
3、getTask 是从队列中获取 task 的核心逻辑,其中包含对线程数的判断以及是否允许核心线程数超时的判断。这些判断会影响从队列中获取task等待的时长,当然还有些比较细的内容需要额外的去说。
4、当当前的 Thread 从 getTask 方法中获取的 task 为空时,就说明这个线程已经没用了,就会消亡

三、有关 Worker 的一些疑问

3.1 为什么 Worker 要继承 AQS

  • 上面已提到,ThreadPoolExecutor 需要的是不能重入的锁
  • runWorker 方法是在 ThreadPoolExecutor 中的,它的参数是需要执行 Worker。而runWorker 方法是支持多线程,只是不支持同一条线程(也就是同一个Worker)出现并发执行的情况,所以让 Worker 自己来上自己的锁。当然 Worker 也可以不用自己去实现 AQS,可以自己有一个 lock 的属性,初始化时创建一个 lock对象。此处集成 AQS 可能也是为了更简洁,更优雅。

3.2 为什么 Worker 要实现 Runnable

  • Worker 实现Runnable 接口,是为了可以作为 Runnable 类型的参数,与 Thread 进行绑定,在 Thread 启动时,会启动 Worker 的 run 方法。

3.3 为什么一定要与 Thread 绑定?为何在 ThreadPoolExecutor 启动 Worker 执行任务要调用 Worker 的 Thread,而不是 Worker 本身呢?

  • 因为只有调用 Thread 的 start 方法,才会在内存中启动一条新的线程单元,如果直接执行 Worker 的 run 方法,那仅仅是主线程执行了 run 方法,并没有启动一条新线程

3.4 为什么不能启动 Worker 方法的run,然后 Worker 中 run 的内容,是启动 Worker 对象持有的 Thread 对象的 start 方法呢?这样做也可以启动一条新的线程单元啊?

  • 如此问题所描述的那样,如果 ThreadPoolExecutor 直接启动 Worker,并且 Worker 将 ThreadPoolExecutor 和 Worker 中持有的 Thread 对象隔离,想想其实也没啥问题,也有可能我没有想到问题的关键所在。如果有哪位大佬知道这么写的原因,还请下方留言~
  • 我能想到的,可能是类结构,或者代码写法上的偏好。当然也可能作者在写的时候,把 Worker 类就看做一个可执行的任务类,它的存在仅仅是对 Thread 的一层包装。这样想的话,Worker 确实有必要去实现 Runnable 接口。

3.5 在 runWorker 方法中,只要当前 Worker 完成了所有任务,就跳出了 while 循环,并执行 finally 中的移除过程,那核心的线程也会被移除吗?

  • 此处有待更新

3.6 Worker为什么不使用ReentrantLock来实现呢?

  • tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

四、总结

本篇文章介绍了 ThreadPoolExecutor 中真正去执行任务的对象 —— Worker,Worker 与 Thread 之间的关系,以及 ThreadPoolExecutor 是如何去启动 Worker 的。再加上个人愚见,如果有理解错误或者缺失的地方,还请下方留言,大家一起交流,一起学习,一起成长。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342