Java 线程池

1、概述

Java中的关于线程池的核心类是Executor,Executor是一个接口,真正的线程池的实现为ThreadPoolExecutor,ThreadPoolExecutor提供了一系列参数来配置线程池,通过不同的参数可以创建不同的线程池。线程池的优点在于:

① 复用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销。

② 能够有效的控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致的阻塞现象。

③ 能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能。

2、构造方法及参数

线程池有多种构造方法,其中最复杂的一个是含有7个参数的构造方法:


public ThreadPoolExecutor(int corePoolSize, 

                          int maximumPoolSize, 

                          long keepAliveTime, 

                          TimeUnit unit, 

                          BlockingQueue workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler)

线程池构造方法

线程池构造方法

① CorePoolSize

线程的核心线程数。默认情况下,核心线程数会在线程中一直存活,即使它们处于闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么核心线程就会存在超时策略,这个时间间隔有keepAliveTime所决定,当等待时间超过keepAliveTime所指定的时长后,核心线程就会被停止。

② maximumPoolSize

线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞。

③ keepAliveTime

非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收,当ThreadPoolExector的allowCoreThreadTimeOut属性设置为True时,keepAliveTime同样会作用于核心线程。

④ unit

用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。


TimeUnit.NANOSECONDS 纳秒

TimeUnit.MICROSECONDS 微秒

TimeUnit.MILLISECONDS 毫秒

TimeUnit.SECONDS    秒

TimeUnit.MINUTES    分钟

TimeUnit.HOURS      小时

TimeUnit.DAYS      天

⑤ workQueue

线程池中的任务队列,通过线程池execute方法提交的Runnable对象会存储在这个参数中。这个任务队列是BlockQueue类型,属于阻塞队列,就是当队列为空的时候,此时取出任务的操作会被阻塞,等待任务加入队列中不为空的时候,才能进行取出操作,而在满队列的时候,添加操作同样被阻塞。

⑥ threadFactory

线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法,newThread(Runnable r),用来创建线程。


ThreadFactory factory =new ThreadFactory() {

        //线程安全的Integer操作类

            private final AtomicInteger mCount =new AtomicInteger(1);

            @Override

            public Thread newThread(Runnable r) {

                return new Thread(r, "new Thread #" + mCount.getAndIncrement());

            }

        };

⑦ 饱和策略,当任务队列和线程池都满了时所采取的应对策略,有已下几种:
AbordPolicy: 抛出RejectedExecutionExcepton异常,该策略是默认策略。

CallerRunsPolicy: 用调用者所在的线程来处理任务。此策略提供简单的反馈控制,能够减缓新任务提交速度。

DiscardPolicy: 直接将该任务舍弃。

DiscardOldestPolicy: 丢弃最老的任务,并执行当前任务。

3、线程池执行逻辑

线程池执行逻辑

线程池的执行逻辑如上图所示,每当有一个任务需要由线程池执行,先判断线程池中线程数是否小于核心线程数,是的话开一个核心线程,并在该线程中执行。如果已经大于核心线程数,则判断任务队列是否已满,如果未满的话线程任务入队。如果任务队列已满,则判断线程池中线程数是否大于最大线程数,如果不是的话就开一个非核心线程处理任务。如果已经开到最大线程数,则调用饱和策略。当线程池中数量大于核心线程数时,若某非核心线程数的空闲时间大于闲置超时时长,则回收该空闲线程。

4、常见线程池

4.1 FixedThreadPool

创建方法:


public static ExecutorService newFixedThreadPool(int var0) {

        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());

    }

该线程池从构造方法可以看出,其核心线程池数量和最大线程池数量一致,故其线程都是核心线程,所以不会被回收。同时它的队列是LinkedBlockingQueue,这个队列详细信息之后会介绍,它有一个特点就是其队列的大小没有限制,也就是说超出的线程任务都会在队列中排队等待。也正是这个特点,可以使它的请求任务小于最大线程数时可以更快的响应外界的请求。

4.2 ScheduledThreadPool

创建方法:


public static ScheduledExecutorService newScheduledThreadPool(int var0) {

        return new ScheduledThreadPoolExecutor(var0);

    }

public ScheduledThreadPoolExecutor(int var1) {

        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());

    }

该线程池有自定义的核心线程数,但是其非核心线程数设置的是int的最大值,可以理解为对非核心线程数没有限制。同时keepAliveTime为0,意味着非核心线程一旦执行完毕就会立刻回收。这个线程池适合执行定时任务和具有固定周期的重复任务。

4.3 CachedThreadPool

创建方法:


public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());

    }

该线程池没有核心线程,并且不限制非核心线程数量,它的keepAliveTime为60秒,超过这个时间就会回收线程。其队列为SynchronousQueue,这个队列的特点是不做数据缓冲,可以粗略理解为这个队列的大小为0,应用到这个线程池里也就是说即一旦有任务过来,会立刻开一个线程执行任务。这个线程池的一般用来执行大量的低耗时任务,当整个线程池闲置,其所有线程都会关闭,从而处于一个低占用资源的状态。

4.4 SingleThreadExecutor

创建方法:


public static ExecutorService newSingleThreadExecutor() {

        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));

    }

该线程池只有且只有一个核心线程,而队列采用的是刚才所说的LinkedBlockingQueue。所以该线程池的线程是一个个执行的,后来的线程都加入到队列中等待。该线程池的意义在于将所有任务都控制在一个线程中,并按照一定顺序执行,这样就不用处理这些任务的同步问题。

5、阻塞队列

上一节已经提到了两个阻塞队列LinkedBlockingQueue和SynchronousQueue了,这一节就来详细介绍一下。

阻塞队列一般用于有生产者和消费者的场景,也就是说生产者往队列里面添加数据,而消费者则从队列中消费数据。这边往往会遇到两种情况:

① 队列中没有数据时,消费者端会被自动阻塞(挂起),直到有数据放入队列,再被唤醒。

② 当队列中填满数据时,生产者端会被自动阻塞(挂起),直到队列中有空位置时,再被唤醒。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

一直阻塞 :当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

接下来介绍一下java中的阻塞队列。

5.1 ArrayBlockingQueue

一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。一旦指定了队列的长度,则队列的大小不能被改变。在生产者消费者例子中,如果生产者生产实体放入队列超过了队列的长度,则在offer(或者put,add)的时候会被阻塞,直到队列的实体数量小于队列的初始size为止。不过可以设置超时时间,超时后队列还未空出位置,则offer失败。如果消费者发现队列里没有可被消费的实体时也会被阻塞,直到有实体被生产出来放入队列位置,不过可以设置等待的超时时间,超过时间后会返回null。

5.2 DelayQueue

一个使用优先级队列实现的无界阻塞队列。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

5.3 LinkedBlockingQueue

一个用链表实现的有界阻塞队列。可以通过构造方法设置capacity来使得阻塞队列是有界的,也可以不设置,则为无界队列。此队列按照先进先出的原则对元素进行排序。

5.4 PriorityBlockingQueue

无界限队列,相当于PriorityQueue 和 BlockingQueue的结合体。插入的对象必须是可比较的,或者通过构造方法实现插入对象的比较器Comparator。队列里的元素按Comparator comparator比较结果排序。

5.5 SynchronousQueue

无内部容量的阻塞队列,put必须等待take,同样take必须等待put。比较适合两个线程间的数据传递。异步转同步的场景不太适用,因为对于异步线程来说在处理完事务后进行put,但是必须等待put的值被取走。SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

6、线程池execute()方法分析

线程池中最常用的方法就是其void execute(Runnable command) 方法了,去看一下其源码实现:


public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

官方对该方法是这么注释的:该方法给线程池提交一个将来要执行的任务。这个任务可能在一个新开的线程中执行,或者在一个已经在线程池中的线程执行。如果任务无法提交执行,无论是因为该执行程序已关闭或因其容量已达到,该任务将由当前的RejectedExecutionHandler处理。

上面都是官方的注释,其实已经大致说明白了这个方法的执行流程。和我再3、线程池执行逻辑里面说的差不多。从上面官方注释也可以看出,线程池执行任务时间不是一调用execute()就执行任务的。线程池会根据各项指标状况来决定是唤醒线程池中一个已有的阻塞线程来执行还是new一个Thread来执行任务。具体这些策略和我们创建线程池适合传入的参数有关。前面几节已经说明了。

6.1 execute()处理三步

在execute()函数内官方注释将其分为了3步来进行:

① 如果运行的线程数少于核心线程,开启一个新线程给这个command。调用addWorker()方法,并用原子性的方式检查runState和workerCount,如果addWorker()返回true,则代表任务执行成功,直接返回。否则进入下一步。对应如下代码:


// void execute(Runnable command)

int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) { 

            if (addWorker(command, true)) 

                return; 

            c = ctl.get(); 

        } 

② 这一步将任务入队,如果任务能成功入队,此时任然应该要仔细检查是否应该添加一个线程(因为在上次检查到这次检查之间,可能有线程死亡)或者在进入这个方法时候线程池已经关闭了。所以需要检查状态并且如果有必要的话回滚队列,或者如果没有线程的话新开始一个线程。对应如下代码:

// void execute(Runnable command)

if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

③ 如果入队失败,那么我们尝试添加一个新线程。 如果失败,我们知道线程池已关闭或饱和,因此拒绝任务。对应如下代码:

// void execute(Runnable command)

else if (!addWorker(command, false))

            reject(command); 

6.2 ctl成员变量

ctl是控制线程池状态的变量,由两部分组成,runState(高位)和workerCount(低28位),这种处理方法类似于Android里测量阶段要用到的MeasureSpec变量。runStateOf(int c) 方法用于获取线程池状态,workerCountOf(int c)方法用于获取线程池正在运行线程数。这样在execute()中调用的方法isRunning(recheck)作用就很好判断了,其作用是判断线程是否处在运行状态。

6.3 addWorker(Runnable firstTask,boolean core)

我们在execute()中多次看到了addWorker()函数,看一下该函数做了什么:

// ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // 如有必要检查队列是否为空,如果空直接返回false

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                  firstTask == null &&

                  ! workQueue.isEmpty()))

                return false;

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // 重新获取ctl

                if (runStateOf(c) != rs)

                    continue retry;

            }

        }

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    // 保持锁定时重新检查。

                    // 线程生成失败或者在获取锁之前线程池关闭则退出

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // 再次检查线程是启动的

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

整个方法的作用大致流程就是检查是否可以根据当前池状态和给定的界限(核心线程数或最大线程数)添加并创建新的Worker(Worker内部封装了一个线程)。 如果可以,则相应地更新线程中Worker数量,并且创建并开始新的Worker,运行firstTask作为其第一个任务。 如果线程池停止或被关闭,此方法返回false。 如果线程工厂在被询问时未能创建线程,它也会返回false。 如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError),则会调用 addWorkerFailed(w) 干净地回滚,将新创建的Worker回收。

Tips: addWorker()中调用的方法compareAndIncrementWorkerCount(c),其作用是比较原子变量并加一,如果改原子变量加一前值没有变就返回true,不然就返回false。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容