Java线程池详解2--任务提交及执行

ThreadPoolExecutor如何实现任务的提交和执行的呢?

首先,看一下ThreadPoolExecutor的Worker内部类。

Worker

ThreadPoolExecutor定义了内部类Worker来表征线程池中的工作线程:

// 继承了AQS,并实现了Runnable接口
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    // 工作线程
    final Thread thread;
    // 待执行的任务
    Runnable firstTask;
    // 当前线程已执行的任务数
    volatile long completedTasks;

    // 构造函数
    Worker(Runnable firstTask) {
        // 调用AQS的setState方法将锁状态设置为-1
        setState(-1); 
        this.firstTask = firstTask;
        // 通过线程工厂创建线程
        // 注意: 创建线程时会将当前worker传入,worker本身也是一个runnable
        this.thread = getThreadFactory().newThread(this);
    }

    // 定义启动函数
    // addWorker()-->t.start()-->t.run()-->worker.run()
    public void run() {
        runWorker(this);
    }

    // 0代表无锁状态
    // 1代表有锁状态
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    // 实现AQS的tryAcquire方法
    protected boolean tryAcquire(int unused) {
        // CAS将状态值由0更新为1
        if (compareAndSetState(0, 1)) {
            // 若成功,则将当前线程设置为锁独占线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 实现AQS的tryRelease方法
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        // 将状态值为0
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    // 中断已启动线程
    void interruptIfStarted() {
        Thread t;
        // getState() >= 0 代表线程处于非Running状态
        // (t = thread) != null 代表工作线程不为null
        // !t.isInterrupted() 代表当前线程未被中断过
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

为何将线程包装成worker呢?其实主要为了实现工作线程和空闲线程的识别。

  • 正在执行任务的线程为工作线程;
  • 未执行任务的线程为空闲线程。

Worker继承了AQS,并定义了tryAcquire和tryRelease方法。线程需要获取锁才可以执行任务,任务执行完毕后释放锁。

当检测到线程有锁时,则说明该线程为工作线程;反之,当检测到线程无锁时,则说明该线程为空闲线程。

下面从线程执行方法开始跟一下源码:

execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // worker数目小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 将command作为firstTask创建1个核心worker
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 此时核心线程数已满,尝试创建非核心线程处理command任务
    // 如果线程池状态为running且将当前任务添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池状态不为running且当前任务已成功移除出阻塞队列,则执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池工作线程数目为0,则添加1个非核心工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果线程池状态不为running
    // 或者线程池状态为running且当前任务添加到阻塞队列失败(阻塞队列已满),则尝试添加非核心工作线程并处理当前任务
    // 若失败,则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

归纳一下任务提交流程:

  • 若当前工作线程数目小于corePoolSize,则创建新的核心线程,并将command任务提交给该新建的核心线程执行;
  • 若当前工作线程数目已等于corePoolSize,则将command任务添加到阻塞队列;
  • 若command任务未添加到阻塞队列(阻塞队列已满),则创建新的非核心线程,并将command任务提交给该新建的非核心线程执行。

可以看到,execute方法主要落脚在addWorker方法上。

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 外层循环,负责判断线程池状态
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 第一种情况: rs >= SHUTDOWN,即线程池的状态为SHUTDOWN、STOP、TIDYING、TERMINATED,此时没必要添加工作线程
        // 第二种情况: 下列3种情况只要满足1种,则没必要要添加工作线程
        // (1) rs != SHUTDOWN(隐含rs >= SHUTDOWN),即线程池状态为STOP、TIDYING、TERMINATED
        // (2) firstTask != null(隐含rs == SHUTDOWN),当线程池状态为SHUTDOWN时,如果firstTask != null,此时添加任务会被拒绝
        // (3) workQueue.isEmpty()(隐含rs == SHUTDOWN && firstTask == null),如果此时任务队列为空,则没必要添加工作线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 内层循环,将Worker数目+1
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS将worker数目+1,成功则跳出retry循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS将worker数目+1失败,再次读取ctl
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态发生改变,则跳出内层循环,继续外层循环
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    // 1. 将线程添加到Workers Set集合
    // 2. 启动线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN,表明线程池状态为RUNNING
                // rs == SHUTDOWN && firstTask == null,当线程池状态为SHUTDOWN,且Worker的初始任务为null,但workQueue中可能有未执行完的任务,此时仍需添加worker
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 若线程添加成功,则启动该线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

概括一下上述流程:

  • 判断线程池当前是否为可以添加worker线程的状态,可以则继续进行下一步,不可以则return false;
    • 线程池状态>shutdown,可能为stop、tinying、terminated,不能添加worker线程;
    • 线程池状态为shutdown,且firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务;
    • 线程池状态为shutdown,firstTask为空,且workQueue也为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取Task,而workQueue为空,说明添加无任务线程已经没有意义。
  • 线程池当前线程数量是否超过上限(corePoolSize或maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步;
  • 在线程池ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁;
  • 当worker添加成功后,则启动该线程。

t.start()方法很有意思,因为t为worker持有的线程,t初始化时传入的runnable又为worker本身。

t.start()本质上会调用到Thread的run方法:

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

Thread的run()方法又会调用到runnable的run()方法,worker继承了Runnable接口,并覆写了run方法:

public void run() {
    runWorker(this);
}

本质上调用的是ThreadPoolExecutor的runWorker方法:

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 因为初始化后的worker的state值为-1,需要通过unlock()方法将state值置为0,保证worker可执行lock()操作
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 阻塞执行(直到task为空才退出)
        while (task != null || (task = getTask()) != null) {
            // 需要获取worker独占锁,且不重入
            // 执行任务前,获取worker锁,任务执行完毕后,才释放worker锁
            // 只要检测到worker为已获取锁状态,则证明该worker为active状态
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 处理worker退出
        processWorkerExit(w, completedAbruptly);
    }
}

概括一下上述流程:

  • 首先通过unlock()方法将state值置为0(初始化后的worker的state值为-1,无法成功执行lock()操作),保证worker后续可以获取锁以便执行任务;
  • 阻塞获取worker自身持有的task及阻塞队列中的task,然后执行;
  • 当获取不到task时,释放掉worker锁变为空闲线程;
  • 最后执行processWorkerExit方法处理空闲线程。

接着看一下getTask和processWorkerExit方法。

getTask

private Runnable getTask() {
    // poll获取任务是否超时
    boolean timedOut = false;

    for (;;) { ①
        int c = ctl.get();
        int rs = runStateOf(c);

        // 第一种情况: rs的状态为stop、tinying、terminated
        // 第二种情况: rs的状态为shutdown,且workQueue.isEmpty
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {   ②
            // 循环CAS减少worker数量,直到成功
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否有超时机制timed
        // 1. allowCoreThreadTimeOut允许核心线程空闲超时后回收
        // 2. wc > corePoolSize代表非核心线程空闲均会超时回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  ③
        
        // 1. wc > maximumPoolSize且wc > 1
        // 2. (timed && timedOut)且wc > 1
        // 3. wc > maximumPoolSize且workQueue.isEmpty()
        // 4. (timed && timedOut)且workQueue.isEmpty()
        if ((wc > maximumPoolSize || (timed && timedOut))    ④
            && (wc > 1 || workQueue.isEmpty())) {  ⑤
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?  ⑥
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

timed标识当前线程是否具有超时机制:

  • wc > corePoolSize时,代表当前worker线程为非核心线程,timed恒等于true,说明非核心线程均具有超时机制;
  • wc <= corePoolSize时,代表当前worker线程为核心线程,核心线程默认不具有超时机制(allowCoreThreadTimeOut默认为false),仅有allowCoreThreadTimeOut配置为true时,核心线程才具有线程机制。

换句话说。只要timed为true,当前worker线程必然具有超时机制。

(1)核心线程

假设某核心线程已将task执行完,且workQueue也为空,线程在runWorker()方法里继续阻塞执行getTask()方法,因为allowCoreThreadTimeOut默认为false,且wc <= corePoolSize,故timed为false。

此时,第一个判断:(wc > maximumPoolSize || (timed && timedOut)为false,直接执行⑥处代码,等待keepAliveTime后,因为workQueue为空,所以超时之后返回null,并将timeOut设置为true,接着继续执行①处循环。

由于timed一直等于false,所以该空闲的核心线程会一直阻塞在①处。

(2)非核心线程

假设某非核心线程已将task执行完,且workQueue也为空,线程在runWorker()方法里继续阻塞执行getTask()方法,因为allowCoreThreadTimeOut默认为false,但此时wc > corePoolSize,故timed为true。

此时,第一个判断:(wc > maximumPoolSize || (timed && timedOut)为false,直接执行⑥处代码,等待keepAliveTime后,因为workQueue为空,所以超时之后返回null,并将timeOut设置为true,接着继续执行①处循环。

继续执行到第一个判断,此时(wc > maximumPoolSize || (timed && timedOut)为true,继续执行第二个判断: (wc > 1 || workQueue.isEmpty()),此时第二个判断为true,尝试将工作线程数减1,若成功,则直接返回null,若失败,则继续执行①处循环,直到工作线程数减1操作成功。

(3)wc > maximumPoolSize

正常情况下,wc不会大于maximumPoolSize,因为添加worker时,会先判断线程数是否超过maximumPoolSize,若超过则不执行addWorker操作。之所以出现wc > maximumPoolSize,可能是某线程执行了setMaximumPoolSize操作,新设置的maximumPoolSize低于现有worker数。

此时当前worker执行getTask操作时,由于wc > maximumPoolSize,循环执行compareAndDecrementWorkerCount操作,直到成功返回null。接着跳出addWorker的while循环,继续执行finally代码块的processWorkerExit操作。

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();
    // 从Workers Set中移除worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 任何对线程池有负效益的操作时,都需要尝试终止线程池
    tryTerminate();

    int c = ctl.get();
    // 线程池状态为running或shutdown时,如果不是突然终止的,但当前线程数量少于需要维护的线程数量,则addWorker()
    // 如果corePoolSize为0且workQueue不为空,则创建1个线程逐渐消耗掉workQueue中的任务
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // allowCoreThreadTimeOut默认为false,min为核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min为0,即不需要维持核心线程数
            // 但workQueue不为空,至少保持一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果线程数量大于最小数量,直接返回
            // 否则下面至少要addWorker一个
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

如果线程是突然终止的,说明是task执行时出现异常导致的,即run()方法执行时发生异常,那正在工作的线程数量需要减1。

如果不是突然终止的,说明是worker线程没有task可执行,不用减1,因为getTask()方法中已经减1了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342