java 线程池(线程的复用)

本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。

在ThreadPollExcutor类中,有一个字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是对线程池的运行状态和线程池中有效线程的数量进行控制的, 它包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),还有几个对ctl进行计算的方法:

// 获取运行状态

privatestaticintrunStateOf(intc)    {returnc & ~CAPACITY; }

// 获取活动线程数

privatestaticintworkerCountOf(intc)  {returnc & CAPACITY; }

以上两个方法在源码中经常用到,结合我们的目标,对运行状态的一些判断及处理可以不用去管,而对当前活动线程数要加以关注等等。

下面将遵循这些原则来分析源码。

解惑

当我们要向线程池添加一个任务时是调用ThreadPollExcutor对象的execute(Runnable command)方法来完成的,所以先来看看ThreadPollExcutor类中的execute(Runnable command)方法的源码:

publicvoid execute(Runnable command) {

    if(command ==null)

        thrownew NullPointerException();

    intc = ctl.get();

    if(workerCountOf(c) < corePoolSize) {

        if(addWorker(command,true))

            return;

        c = ctl.get();

    }

    if(isRunning(c) && workQueue.offer(command)) {

        intrecheck = ctl.get();

        if(! isRunning(recheck) && remove(command))

            reject(command);

        elseif(workerCountOf(recheck) == 0)

            addWorker(null,false);

    }

    elseif(!addWorker(command,false))

        reject(command);

}


按照我们在分析方法中提到的一些原则,去掉一些相关性不强的代码,看看核心代码是怎样的。

// 为分析而简化后的代码publicvoid execute(Runnable command) {

    intc = ctl.get();

    if(workerCountOf(c) < corePoolSize) {

        // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中if(addWorker(command,true))

        return;

        c = ctl.get();

    }

    // 如果当前活动线程数大于等于corePoolSize,则尝试将任务放入缓存队列if (workQueue.offer(command)) {

        intrecheck = ctl.get();

        if(workerCountOf(recheck) == 0)

        addWorker(null,false);

    }else {

        // 缓存已满,新建一个线程放入线程池,并把任务添加到该线程中(此时新建的线程相当于非核心线程)addWorker(command,false)

    }

}


这样一看,逻辑应该清晰很多了。

如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

接下来看 addWorker(Runnable firstTask, boolean core)方法

private boolean addWorker(Runnable firstTask,boolean core) {

    retry:

    for (;;) {

        intc = ctl.get();

        intrs = runStateOf(c);

        // Check if queue empty only if necessary.if(rs >= SHUTDOWN &&        ! (rs == SHUTDOWN &&        firstTask ==null&&        ! workQueue.isEmpty()))

        returnfalse;

        for (;;) {

            intwc = workerCountOf(c);

            if(wc >= CAPACITY ||            wc >= (core ? corePoolSize : maximumPoolSize))

            returnfalse;

            if (compareAndIncrementWorkerCount(c))

            break retry;

            c = ctl.get();// Re-read ctlif(runStateOf(c) != rs)

            continue retry;

            // else CAS failed due to workerCount change; retry inner loop        }

    }

    booleanworkerStarted =false;

    booleanworkerAdded =false;

    Worker w =null;

    try {

        w =new Worker(firstTask);

        finalThread t = w.thread;

        if(t !=null) {

            finalReentrantLock mainLock =this.mainLock;

            mainLock.lock();

            try {

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.intrs = runStateOf(ctl.get());

                if(rs < SHUTDOWN ||                (rs == SHUTDOWN && firstTask ==null)) {

                    if(t.isAlive())// precheck that t is startablethrownew IllegalThreadStateException();

                    workers.add(w);

                    ints = workers.size();

                    if(s > largestPoolSize)

                    largestPoolSize = s;

                    workerAdded =true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();

                workerStarted =true;

            }

        }

    } finally {

        if(! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}


同样,我们也来简化一下:

// 为分析而简化后的代码private boolean addWorker(Runnable firstTask,boolean core) {

    intwc = workerCountOf(c);

    if(wc >= (core ? corePoolSize : maximumPoolSize))

    // 如果当前活动线程数 >= 指定的核心线程数,不创建核心线程

    // 如果当前活动线程数 >= 指定的最大线程数,不创建非核心线程 returnfalse;

    booleanworkerStarted =false;

    booleanworkerAdded =false;

    Worker w =null;

    try {

        // 新建一个Worker,将要执行的任务作为参数传进去w =new Worker(firstTask);

        finalThread t = w.thread;

        if(t !=null) {

            workers.add(w);

            workerAdded =true;

            if (workerAdded) {

                // 启动刚刚新建的那个worker持有的线程,等下要看看这个线程做了啥                t.start();

                workerStarted =true;

            }

        }

    } finally {

        if(! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}


看到这里,我们大概能猜测到,addWorker方法的功能就是新建一个线程并启动这个线程,要执行的任务应该就是在这个线程中执行。为了证实我们的这种猜测需要再来看看Worker这个类。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{

    // ....}

Worker(Runnable firstTask) {

    setState(-1);// inhibit interrupts until runWorkerthis.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

}


从上面的Worker类的声明可以看到,它实现了Runnable接口,以及从它的构造方法中可以知道待执行的任务赋值给了它的变量firstTask,并以它自己为参数新建了一个线程赋值给它的变量thread,那么运行这个线程的时候其实就是执行Worker的run()方法,来看一下这个方法:

public void run() {

    runWorker(this);

}

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask =null;

    w.unlock(); // allow interruptsbooleancompletedAbruptly =true;

    try {

        while(task !=null|| (task = getTask()) !=null) {

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted. This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interruptif((runStateAtLeast(ctl.get(), STOP) ||            (Thread.interrupted() &&            runStateAtLeast(ctl.get(), STOP))) &&            !wt.isInterrupted())

                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown =null;

                try {

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x;throw x;

                } catch (Error x) {

                    thrown = x;throw x;

                } catch (Throwable x) {

                    thrown = x;thrownew Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task =null;

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly =false;

    } finally {

        processWorkerExit(w, completedAbruptly);

    }

}


在run()方法中只调了一下 runWorker(this) 方法,再来简化一下这个 runWorker() 方法

// 为分析而简化后的代码

final void runWorker(Worker w) {

    Runnable task = w.firstTask;

    w.firstTask =null;

    while(task !=null|| (task = getTask()) !=null) {

        try {

            task.run();

        } finally {

            task =null;

        }

    }

}


很明显,runWorker()方法里面执行了我们新建Worker对象时传进去的待执行的任务,到这里为止貌似这个worker的run()方法就执行完了,既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的核心线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?再仔细看一下runWorker()方法的代码,有一个while循环,当执行完firstTask后task==null了,那么就会执行判断条件 (task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法

private Runnable getTask() {

    lean timedOut =false;// Did the last poll() time out?for (;;) {

        intc = ctl.get();

        intrs = runStateOf(c);

        // Check if queue empty only if necessary.if(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            returnnull;

        }

        intwc = workerCountOf(c);

        // Are workers subject to culling?booleantimed = allowCoreThreadTimeOut || wc > corePoolSize;

        if((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                returnnull;

            continue;

        }

        try {

            Runnable r = timed ?            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

            workQueue.take();

            if(r !=null)

            return r;

            timedOut =true;

            } catch (InterruptedException retry) {

                timedOut =false;

            }

    }

}


老规矩,简化一下代码来看:

// 为分析而简化后的代码

private Runnable getTask() {

    booleantimedOut =false;

    for (;;) {

        intc = ctl.get();

        intwc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。

        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;

        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;

        // 对于超过核心线程数量的这些线程,需要进行超时控制booleantimed = allowCoreThreadTimeOut || wc > corePoolSize;

        if(timed && timedOut) {

            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,

            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了if (compareAndDecrementWorkerCount(c))

            returnnull;

            continue;

        }

        try {

            Runnable r = timed ?            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

            workQueue.take();

            // 注意workQueue中的poll()方法与take()方法的区别

            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null

            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止if(r !=null)

            return r;

            timedOut =true;

            } catch (InterruptedException retry) {

                timedOut =false;

        }

    }

}


从以上代码可以看出,getTask()的作用是

如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。

如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

小结

通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:

  当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。

  当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来


图片来源于:https://www.cnblogs.com/linguanh/p/8000063.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343