《Java核心技术系列一》ThreadPoolExecutor 源码剖析

该系列统一使用java8的源码进行讲解。

由于线程的创建于销毁是存在开销的,为了避免频繁的创建与销毁线程,Java采用了池化技术来管理线程资源。只要涉及到多线程、异步的场景,基本就会有线程池的存在。因此掌握好线程池实现原理对程序员来说非常的重要,也是通往高级程序员以及架构师的必经之路。 本文主要从以下几个方面对线程池技术进行讲解。

  • 剖析线程池的源码实现
  • 讲解使用线程池的注意事项
  • 线程池的变异使用方式(Tomcat与Netty如何使用线程池)
  • 面试中的线程池问答

一. 源码剖析

为了使线程池可以适用于多种场景,对于线程池的创建提供了多个参数,进行控制。各个参数的含义必须要非常的明确。

1.1 构造方法

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime 保活时间
  • unit 保活时间的单位
  • workQueue 任务队列
  • threadFactory 线程工厂
  • handler 拒绝策略

结合参数描述一下线程的工作原理,以新来一个任务为例:

1. 新来任务后,如果线程数<corePoolSize,则创建线程执行(即便存在空闲的线程),否则执行2
2. 如果workQueue没有达到最大值则扔进阻塞队列,否则执行3
3. 如果线程数<maximumPoolSize,则创建线程执行,否则执行4
4. 按照指定的拒绝策略handler处理新来的任务

除了上面步骤提到的参数外,还有

  1. keepAliveTime, unit 保活时间,如果Worker阻塞在从workQueue中获取任务的时间超过该时间,且线程数>corePoolSize,那么就会对该Worker进行销毁,避免过多的线程阻塞,浪费资源。
  2. threadFactory 线程工厂,用于创建线程对象

连接了各个参数的含义,看下构造函数的源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

源码中只是进行参数取值范围控制,并赋值。

1.2 execute 提交任务

创建好线程池之后,我们就需要往线程池中提交任务,提交任务有两个方法(低级的面试也会问这两个方法有什么区别):

  1. submit() 有返回值,返回Future对象(Future后面再将)
  2. execute() 无返回值

其中 submit也只是任务包装成Future之后,调用execute,所以这里我们只需要看execute方法的实现即可。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //线程数小于核心线程数则新增worker执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
       //否则,扔到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //扔进阻塞队列后判断状态,如果线程池状态处于非运行状态,则执行拒绝策略handler
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果运行着,但是没有worker,那么新增worker执行,为什么会出现这种情况?
            //因为有参数可以控制核心线程数也可以在超时的情况下被销毁:allowCoreThreadTimeOut这个参数控制
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 队列慢,则新增worker执行任务
        else if (!addWorker(command, false))
            //worker也达到上限,则执行拒绝策略
            reject(command);
    }

其中,ctl一个线程安全的AtomicInteger变量,用一个整数来记录了线程池的状态(高三位)和目前线程池中线程(Worker)的个数(低29位)
举例说明:ctl的值为:
1000 0000 0000 0000 0000 0000 0000 0001 高三位100代表线程池处于运行状态,低29位为1,说明目前线程池中只有1个线程。
workerCountOf(c) 返回的就是低29位表示的数,即线程个数
isRuning(c) 就是判断高3位是否为100,100位运行状态
然后上面的代码逻辑就是我们一开始整理的新来一个任务时,线程池的执行逻辑。非常的重要,几乎每次面试都会被问。

1.3 Worker 线程池中的工作者

线程池中的工作者是Worker,Worker不仅对Thread进行了包装,还继承了AbstractQueuedSynchronizer(AQS相关的知识简单讲,后面会有文章细讲)实现了Runnable,下面我们就带着问题一起来认识下Worker。

1.3.1. Worker为什么要实现Runnable接口?

Worker中封装了Thread,也就是在构造Worker的时候,会创建Thread对象,Thread对象又要关联一个任务去执行,那这个任务就是Worker自己本身。也就是说:Worker中的线程对象Thread执行的是Worker的run方法。这样的话,thread一旦执行,执行的就是Worker的run方法,看下Worker的构造方法:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

从构造方法中即可以看出,thread一旦启动,调用的就是Worker的run方法。

1.3.2 Worker为什么还实现了AbstractQueuedSynchronizer

这里主要是为了实现Worker的中断。从1.3.1 Worker的构造函数中可以看到,设置状态为-1, 相当于给Worker加了一把锁。那什么时候会解锁呢?简单看下runWork方法(也就是Worker的run方法),代码如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        //...省略
}

其中unlock()方法就是解锁,unlock方法会调用Worker的release方法,将state的值+1,这样state值就为0了。因为Worker创建并不代表Thread执行,只有Thread线程真正执行了,才会响应中断。此外,在执行每一个task的过程中也不允许中断。响应中断的方法如下:

      void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

首先会判断getState(), 这个state就是AQS的值,当Thread线程开始执行后,该值就会变为0,那么在这个中断方法中就可以进入进行中断了。

1.3.3 Worker线程都做了哪些事情

这就要看runWork方法了,代码如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //首次task不为null执行自己的任务,此后从workQueue中去任务
            while (task != null || (task = getTask()) != null) {
                //上锁,不允许中断
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

当worker创建时,firstTask是被赋值了的,所以先执行自己的任务,此后所有的任务都是通过getTask()从workQueue中获取。拿到任务后先lock加锁,然后通过调用task.run方法执行任务,执行完成后,解锁。
从这里可以看出来,在一个任务任务的执行过程中是不需要中断的
通过getTask方法,如果返回的是null,那么就要执行processWorkerExit,对该Worker进行退出

1.3.4 getTask只是从workerQueue中获取任务吗?

getTask除了从workerQueue中获取任务外,还会对worker的等待时间进行判断,释放掉多余的worker。
看下getTask的实现:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池关闭状态下,如果workQueue空,则减少Worker
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 判断是否需要因为worker数>corePoolSize 而销毁worker
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           //超时,且要多余1个线程,且目前没有任务需要处理,则进行销毁Worker
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //仅仅从数量上-1,销毁Worker的事情让runWork方法去做
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //去队列中获取数据,如果需要考虑超时,则按照超时返回的策略去获取任务
               //如果不需要考虑超时,则直接使用take方法阻塞在workQueue上
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //任务存在,直接将任务返回,执行任务
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask方法会根据当前线程池的状态,去判断该Worker是否需要有限超时从workQueue中获取任务,这样可以让getTask提前退出,销毁多余的Worker。从这里也可以看出来并不会说先创建的线程就是核心线程,线程池只关心线程的数量,不关心哪些线程是因为<corePoolSize创建的,哪些是因为>=corePoolSize创建的,在销毁的时候是随机销毁的。

1.4 Worker何时被启动的

当一个新任务被提交到线程池后,有三种情况会创建新的worker并启动worker

  1. 线程数<corePoolSize时
  2. 线程数>=corePoolSize,且workQueue满时
  3. 任务添加到阻塞队列后,发现线程数为0时

会调用addWorker方法完成Worker的新增,代码如下:

   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //先通过死循环,保证在ctl上把worker数加上
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //构造一个worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //保证线程安全,只能有一个线程执行这段代码
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
               //添加成功后,通过线程启动worker
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker做了几件事情

  1. 在死循环中完成对ctl数值+1,这里为什么不用加锁?因为这里使用的是cas操作,属于乐观锁,不需要加锁也能保证线程安全的修改ctl
  2. 创建worker,并加锁将worker放到workers列表中,然后通过执行线程的start方法,调用Worker的run方法,然后执行runWork方法,Worker就开始工作了

到此,关于线程池的核心源码部分就基本完成了,关于更细致的源码剖析,线程池各个状态的转换细节可以参考我的另一篇简书上的文章 https://www.jianshu.com/p/a52f438c16be,有关线程池相关的剩余部分限于篇幅问题,放在下一篇中继续剖析。如有问题欢迎大家指正,我们一起学习,共同进步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343