为什么要用线程池
关于为什么要使用多线程,请参考【多线程与并发】:线程的创建、状态、方法中的最后一点。
那为什么要使用线程池呢?
①降低资源消耗:对象的创建和销毁是非常耗时的操作(线程也是一个对象)。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
②提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行;
③提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池工作原理
这里是指jdk中的线程池实现。
1、线程池的主要处理流程
2、创建线程池
//ThreadPoolExecutor的构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数解释
corePoolSize(线程池的基本大小):当提交一个任务时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreTreads()方法,线程池就会提前创建并启动所有基本线程。
maximumPoolSize(线程最大数量):线程池允许创建的最大线程数。如果队列已满,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。但如果使用了无解的任务队列,该参数没有效果。
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。如果任务很多,且每个任务执行时间较短,可调大该值。
TimeUnit(线程活动保持时间的单位):keepAliveTime的时间度量单位。可选天、小时、分钟、毫秒、微妙、纳秒。
BlockingQueue<Runnable>(任务队列):用于保存等待执行的任务的阻塞嘟列,可以选择以下几个阻塞队列
ArrayBlockingQueue:基于数组结构的有界阻塞队列
LinkedBlockingQueue:基于链表机构的阻塞队列,吞吐量通常高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用该队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用该队列。
PriorityBlockingQueue:具有优先级的无限阻塞队列。
ThreadFactory:创建线程的工厂。
RejectedExecutionHandler:饱和策略,即队列和线程池都满了,对于新提交的任务无法执行,这时采取的处理新来的任务的方法,有4种策略可选(也可以自定义策略---实现RejectedExecutionHandler接口,如记录日志或持久化不能处理的任务):
CallerRunsPolicy:使用调用者所在的线程来运行任务。
AbortPolicy:直接抛出
RejectedExecutionException
异常。(默认策略)DiscardPolicy:对新任务直接丢弃,不做任何事情
DiscardOldestPolicy:丢掉队列里最近(
the oldest unhandled
)的一个任务,并执行当前新任务。
3、向线程池提交任务
有两种方式将任务提交给线程池来执行
-
execute()
用于提交不需要返回值的任务,所以无法判断任务是否被线程执行成功。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
30, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());
threadPool.execute(new Runnable() {
public void run() {
//do something
}
});
-
submit()
提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过Future对象的get()方法来获取返回值。但get()方法会阻塞当前线程直到任务完成,使用get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回(此时任务可能还没有执行完)。
//有结果的任务
class TaskWithResult implements Callable<String>{
@Override
public String call() throws Exception {
return "返回:我是实现有结果的任务";
}
}
@org.junit.Test
public void test() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
Future<String> future = threadPool.submit(new TaskWithResult());
System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
System.out.println(future.get());
System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
}
//输出为:
没执行完呢
返回:我是实现有结果的任务
执行完了
4、关闭线程池
调用线程池的两个方法来关闭shutdown()或者shutdownNow():遍历线程池中的工作线程,然后逐个调用线程的interupt()方法中断线程,所以无法响应中断的任务可能永远无法终止。
- shutdownNow()
不允许添加新的任务。立刻关闭线程池。不管池中是否还存在正在运行的任务。关闭顺序是先尝试关闭当前正在运行的任务。然后返回待完成任务的清单。已经运行的任务则不返回。(首先将线程池的状态设置为STOP,然后尝试终止所有的线程(包括正在执行任务或暂停任务的),并返回等待执行任务的列表;) - shutdown()
不允许添加新的任务,等池中所有的任务执行完毕之后再关闭线程池。
(只是将线程池的状态设置为SHUTDOWN,然后中断所有没有正在执行任务的线程。)
//todo 有待验证shutdown()和shutdownNow()的区别
线程池框架Executor
1、简介
Executor是(since)JDK1.5实现的线程池技术。
先看Executor框架的主要类与接口
Executor主要可以分为3个部分:
-
任务对象的创建:实现
Runnable
接口或实现Callable
接口 -
任务的执行:接口ExecutorService、两个实现类
ThreadPoolExecutor
和ScheduledThreadPoolExecutor
- 异步计算的结果:接口Future以及实现类FutureTask
2、任务对象
- 两种方式创建任务对象:实现
Runnable
接口或实现Callable
接口。 - 两者的区别:Runnable不会返回结果,Callable可以返回结果;Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常
- Runnable可以包装为Callable:通过Executors工具类提供的两个方法
public static <T> Callable<T> callable(Runnable task, T result);
public static Callable<Object> callable(Runnable task);
3、任务的执行
任务的执行是由两个实现类完成的:ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。
前面介绍线程池工作过程就是以ThreadPoolExecutor
为例进行的。在实际使用中,通常使用工具类Executors
创建不同类型的ThreadPoolExecutor
。
而ScheduledThreadPoolExecutor
是ThreadPoolExecutor
类的子类,相当于特定功能的扩展:在给定的延迟之后运行任务或者定期执行任务。它与Timer的功能类似,但更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor
可以指定多个线程数。
-
ThreadPoolExecutor
工具类Executors
可以创建3中类型的ThreadPoolExecutor
,分别如下。 - FixedThreadPool:可重用、固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看出,
①FixedThreadPool的corePoolSize和maxmumPoolSize都被设置为
nThreads。
②keepAliveTime设为0,表示某工作线程一旦空闲,就立即关闭该工作线程。
③使用无界队列LinkedBlockingQueue,当线程池中的线程数达到corePoolSize后,新任务将会在无界队列中等待,因此线程数永远不会超过corePoolSize。
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,比如负载较重的服务器。
- SingleThreadExecutor:只有一个线程的线程池。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
参数与FixedThreadPool的区别仅在于corePoolSize和maxmumPoolSize均为1,keepAliveTime和使用的阻塞队列都一样,特性类似,可以概括为:当有新任务时,如果线程池中没有线程,则创建一个线程,之后来的任务都存储在无界队列LinkedBlockingQueue中。该线程一直从队列中取任务执行。假如任务都执行完毕,立即终止该线程。
SingleThreadExecutor适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
- CachedThreadPool:根据需要创建新的线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
①使用无容量队列SynchronousQueue,但maxmumPoolSize无界。如果提交任务的速度大于线程处理任务的速度,将会不断创建新线程,极端情况会因为创建过多线程而耗尽CPU资源。
②keepAliveTime为60s,空闲线程超过该时间将会终止。
③执行完任务的某线程会执行SynchronousQueue.poll()从队列中取任务,这个取的动作会持续60s,如果在60s内有新的任务,则执行新的任务,没有任务则终止线程。因此长时间保持空闲的CachedThreadPool不会占用任何资源。
④当有任务提交时,a.如果当前线程池为空或者已创建的线程都正在处理任务,则CachedThreadPool会创建新线程来执行该任务。b.如果当前线程池有空闲的线程(正在执行阻塞方法SynchronousQueue.poll()),则将任务交给该等待任务的空闲线程来执行。
CachedThreadPool适用于执行很多的短期异步任务的小程序或者是负载较轻的服务器。
- ScheduledThreadPoolEecutor
先记住它是用来执行定期任务或者在给定延迟时间之后执行任务。其他待深入。
4、异步结果的获取
主要是通过接口Future和实现类FutureTask。
-
Future
Future代表了一个异步计算的结果。
public interface Future<V> {
//取消当前任务,如果任务已经完成,就会取消失败,返回false;
//如果取消成功,并且在调用该方法之前对应的任务还没有开始,
//则该任务永远也不会执行。如果任务正在执行,
//参数mayInterruptIfRunning设为true则表示将正在执行该任务的线程终止,
//参数mayInterruptIfRunning设为false则表示会等待任务完成。
//该方法返回true或false之后,之后的isDone()方法会返回true;
//如果该方法返回true,之后的isCancelled()方法也会返回true,
boolean cancel(boolean mayInterruptIfRunning);
//
//对应的任务是否在完成之前就取消了
boolean isCancelled();
//任务是否完成
boolean isDone();
//
//获取计算结果,如果任务还没执行完成,则会阻塞当前线程(调用该方法所在的线程),直到任务完成
V get() throws InterruptedException, ExecutionException;
//最多等待timeout就尝试取回结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
JDK文档上给出的示例:
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
public void showSearch(final String target)throws InterruptedException {
Future<String> future= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}
});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
-
FutureTask
FutureTask就像它的名字一样,既有Future的特点(实现Future接口),又具有任务的特点(实现Runnable接口)。更直白的理解是,FutureTask就是一种特殊的任务的描述类,利用FutureTask创建的任务可以获取计算结果。
FutureTask表示一个可取消的异步计算,并通过实现Future接口来开始或取消一个计算、查看计算是否完成、获取计算结果。如果计算还没有完成,调用FutureTask的get方法会阻塞当前线程(调用get方法所在的线程)。
FutureTask可以用来包装Callable和Runnable对象,因为实现了Runnable接口,所以FutureTask可以提交给Executor来执行(不提交就调用自己的run方法,也可以执行计算)。
FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
作为一个独立的类,该类提供了很多protected的方法,以便创建你自己的定制任务类。
//todo 具体实现
说明:内容大多数摘抄自《Java并发编程的艺术》