【多线程与并发】:线程池与Executor框架


为什么要用线程池

关于为什么要使用多线程,请参考【多线程与并发】:线程的创建、状态、方法中的最后一点。

那为什么要使用线程池呢?

①降低资源消耗:对象的创建和销毁是非常耗时的操作(线程也是一个对象)。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
②提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行;
③提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。


线程池工作原理

这里是指jdk中的线程池实现。

1、线程池的主要处理流程
线程池处理流程.png
2、创建线程池
//ThreadPoolExecutor的构造器
public ThreadPoolExecutor(int corePoolSize,                                
                          int maximumPoolSize,                             
                          long keepAliveTime,                              
                          TimeUnit unit,                                   
                          BlockingQueue<Runnable> workQueue,               
                          ThreadFactory threadFactory,                     
                          RejectedExecutionHandler handler) {              
    if (corePoolSize < 0 ||                                                
        maximumPoolSize <= 0 ||                                            
        maximumPoolSize < corePoolSize ||                                  
        keepAliveTime < 0)                                                 
        throw new IllegalArgumentException();                              
    if (workQueue == null || threadFactory == null || handler == null)     
        throw new NullPointerException();                                  
    this.corePoolSize = corePoolSize;                                      
    this.maximumPoolSize = maximumPoolSize;                                
    this.workQueue = workQueue;                                            
    this.keepAliveTime = unit.toNanos(keepAliveTime);                      
    this.threadFactory = threadFactory;                                    
    this.handler = handler;                                                
}                                                                          

参数解释

  • corePoolSize(线程池的基本大小):当提交一个任务时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreTreads()方法,线程池就会提前创建并启动所有基本线程。

  • maximumPoolSize(线程最大数量):线程池允许创建的最大线程数。如果队列已满,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。但如果使用了无解的任务队列,该参数没有效果。

  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。如果任务很多,且每个任务执行时间较短,可调大该值。

  • TimeUnit(线程活动保持时间的单位):keepAliveTime的时间度量单位。可选天、小时、分钟、毫秒、微妙、纳秒。

  • BlockingQueue<Runnable>(任务队列):用于保存等待执行的任务的阻塞嘟列,可以选择以下几个阻塞队列

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列

  • LinkedBlockingQueue:基于链表机构的阻塞队列,吞吐量通常高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用该队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用该队列。

  • PriorityBlockingQueue:具有优先级的无限阻塞队列。

  • ThreadFactory:创建线程的工厂。

  • RejectedExecutionHandler:饱和策略,即队列和线程池都满了,对于新提交的任务无法执行,这时采取的处理新来的任务的方法,有4种策略可选(也可以自定义策略---实现RejectedExecutionHandler接口,如记录日志或持久化不能处理的任务):

  • CallerRunsPolicy:使用调用者所在的线程来运行任务。

  • AbortPolicy:直接抛出RejectedExecutionException异常。(默认策略)

  • DiscardPolicy:对新任务直接丢弃,不做任何事情

  • DiscardOldestPolicy:丢掉队列里最近(the oldest unhandled)的一个任务,并执行当前新任务。

3、向线程池提交任务

有两种方式将任务提交给线程池来执行

  • execute()
    用于提交不需要返回值的任务,所以无法判断任务是否被线程执行成功。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
                30, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());
threadPool.execute(new Runnable() {
            public void run() {
                //do something
            }
});
  • submit()
    提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过Future对象的get()方法来获取返回值。但get()方法会阻塞当前线程直到任务完成,使用get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回(此时任务可能还没有执行完)。
 //有结果的任务
class TaskWithResult implements Callable<String>{
        @Override
        public String call() throws Exception {
            return "返回:我是实现有结果的任务";
        }
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        Future<String> future = threadPool.submit(new TaskWithResult());
        System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
        System.out.println(future.get());
        System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
}
//输出为:
没执行完呢
返回:我是实现有结果的任务
执行完了
4、关闭线程池

调用线程池的两个方法来关闭shutdown()或者shutdownNow():遍历线程池中的工作线程,然后逐个调用线程的interupt()方法中断线程,所以无法响应中断的任务可能永远无法终止。

  • shutdownNow()
    不允许添加新的任务。立刻关闭线程池。不管池中是否还存在正在运行的任务。关闭顺序是先尝试关闭当前正在运行的任务。然后返回待完成任务的清单。已经运行的任务则不返回。(首先将线程池的状态设置为STOP,然后尝试终止所有的线程(包括正在执行任务或暂停任务的),并返回等待执行任务的列表;)
  • shutdown()
    不允许添加新的任务,等池中所有的任务执行完毕之后再关闭线程池。
    (只是将线程池的状态设置为SHUTDOWN,然后中断所有没有正在执行任务的线程。)

//todo 有待验证shutdown()和shutdownNow()的区别


线程池框架Executor

1、简介

Executor是(since)JDK1.5实现的线程池技术。

先看Executor框架的主要类与接口

Executor主要类与接口.png
Future.png

Executor主要可以分为3个部分:

  • 任务对象的创建:实现Runnable接口或实现Callable接口
  • 任务的执行:接口ExecutorService、两个实现类ThreadPoolExecutorScheduledThreadPoolExecutor
  • 异步计算的结果:接口Future以及实现类FutureTask

2、任务对象
  • 两种方式创建任务对象:实现Runnable接口或实现Callable接口。
  • 两者的区别:Runnable不会返回结果,Callable可以返回结果;Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常
  • Runnable可以包装为Callable:通过Executors工具类提供的两个方法
public static <T> Callable<T> callable(Runnable task, T result);
public static Callable<Object> callable(Runnable task);

3、任务的执行

任务的执行是由两个实现类完成的:ThreadPoolExecutorScheduledThreadPoolExecutor
前面介绍线程池工作过程就是以ThreadPoolExecutor为例进行的。在实际使用中,通常使用工具类Executors创建不同类型的ThreadPoolExecutor
ScheduledThreadPoolExecutorThreadPoolExecutor类的子类,相当于特定功能的扩展:在给定的延迟之后运行任务或者定期执行任务。它与Timer的功能类似,但更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以指定多个线程数。

  • ThreadPoolExecutor
    工具类Executors可以创建3中类型的ThreadPoolExecutor,分别如下。
  • FixedThreadPool:可重用、固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

可以看出,
①FixedThreadPool的corePoolSize和maxmumPoolSize都被设置为
nThreads。
②keepAliveTime设为0,表示某工作线程一旦空闲,就立即关闭该工作线程。
③使用无界队列LinkedBlockingQueue,当线程池中的线程数达到corePoolSize后,新任务将会在无界队列中等待,因此线程数永远不会超过corePoolSize。
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,比如负载较重的服务器。

  • SingleThreadExecutor:只有一个线程的线程池。
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

参数与FixedThreadPool的区别仅在于corePoolSize和maxmumPoolSize均为1,keepAliveTime和使用的阻塞队列都一样,特性类似,可以概括为:当有新任务时,如果线程池中没有线程,则创建一个线程,之后来的任务都存储在无界队列LinkedBlockingQueue中。该线程一直从队列中取任务执行。假如任务都执行完毕,立即终止该线程。
SingleThreadExecutor适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。

  • CachedThreadPool:根据需要创建新的线程
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

①使用无容量队列SynchronousQueue,但maxmumPoolSize无界。如果提交任务的速度大于线程处理任务的速度,将会不断创建新线程,极端情况会因为创建过多线程而耗尽CPU资源。
②keepAliveTime为60s,空闲线程超过该时间将会终止。
③执行完任务的某线程会执行SynchronousQueue.poll()从队列中取任务,这个取的动作会持续60s,如果在60s内有新的任务,则执行新的任务,没有任务则终止线程。因此长时间保持空闲的CachedThreadPool不会占用任何资源。
④当有任务提交时,a.如果当前线程池为空或者已创建的线程都正在处理任务,则CachedThreadPool会创建新线程来执行该任务。b.如果当前线程池有空闲的线程(正在执行阻塞方法SynchronousQueue.poll()),则将任务交给该等待任务的空闲线程来执行。
CachedThreadPool适用于执行很多的短期异步任务的小程序或者是负载较轻的服务器。

  • ScheduledThreadPoolEecutor
    先记住它是用来执行定期任务或者在给定延迟时间之后执行任务。其他待深入。

4、异步结果的获取

主要是通过接口Future和实现类FutureTask。

  • Future
    Future代表了一个异步计算的结果。
public interface Future<V> {
//取消当前任务,如果任务已经完成,就会取消失败,返回false;
//如果取消成功,并且在调用该方法之前对应的任务还没有开始,
//则该任务永远也不会执行。如果任务正在执行,
//参数mayInterruptIfRunning设为true则表示将正在执行该任务的线程终止,
//参数mayInterruptIfRunning设为false则表示会等待任务完成。
//该方法返回true或false之后,之后的isDone()方法会返回true;
//如果该方法返回true,之后的isCancelled()方法也会返回true,
boolean cancel(boolean mayInterruptIfRunning);
//
//对应的任务是否在完成之前就取消了
boolean isCancelled();
//任务是否完成
boolean isDone();
//
//获取计算结果,如果任务还没执行完成,则会阻塞当前线程(调用该方法所在的线程),直到任务完成
V get() throws InterruptedException, ExecutionException;
//最多等待timeout就尝试取回结果
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

JDK文档上给出的示例:

interface ArchiveSearcher { String search(String target); }
class App {
       ExecutorService executor = ...
       ArchiveSearcher searcher = ...
       public void showSearch(final String target)throws InterruptedException {
             Future<String> future= executor.submit(new Callable<String>() {
                  public String call() {
                      return searcher.search(target);
                  }
              });
             displayOtherThings(); // do other things while searching
             try {
                  displayText(future.get()); // use future
             } catch (ExecutionException ex) { cleanup(); return; }
       }
}
  • FutureTask
    FutureTask继承关系图.png

FutureTask就像它的名字一样,既有Future的特点(实现Future接口),又具有任务的特点(实现Runnable接口)。更直白的理解是,FutureTask就是一种特殊的任务的描述类,利用FutureTask创建的任务可以获取计算结果。
FutureTask表示一个可取消的异步计算,并通过实现Future接口来开始或取消一个计算、查看计算是否完成、获取计算结果。如果计算还没有完成,调用FutureTask的get方法会阻塞当前线程(调用get方法所在的线程)。
FutureTask可以用来包装Callable和Runnable对象,因为实现了Runnable接口,所以FutureTask可以提交给Executor来执行(不提交就调用自己的run方法,也可以执行计算)。

FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
       public String call() {
       return searcher.search(target);
}});
executor.execute(future);

作为一个独立的类,该类提供了很多protected的方法,以便创建你自己的定制任务类。

//todo 具体实现


说明:内容大多数摘抄自《Java并发编程的艺术》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容

  • 先看几个概念:线程:进程中负责程序执行的执行单元。一个进程中至少有一个线程。多线程:解决多任务同时执行的需求,合理...
    yeying12321阅读 538评论 0 0
  • 先看几个概念:线程:进程中负责程序执行的执行单元。一个进程中至少有一个线程。 多线程:解决多任务同时执行的需求,合...
    孙福生微博阅读 95,534评论 38 314
  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 828评论 0 3
  • 从前,有这样一个年轻人,在他上学的时候,没有好好努力也没有抓住好的机遇出人头地。但是他始终没有放弃对美好生活的追求...
    神龙居士阅读 292评论 0 0
  • 在木棉参加了第二次十四天学画打卡,我的作业和老师给大家的点评记录如下: 把大家存在的问题都说一下:1.花瓣凹下去的...
    耕玫竹堂阅读 498评论 0 0