如果你要开启线程执行任务,你会怎么做?
- 开启一个线程,然后串行执行所有任务
- 一个任务开启一个线程,任务不会再等待,但是线程的创建销毁是需要时间的;线程会消耗系统资源,尤其是内存;线程数的增加在一定范围内会提高吞吐率,但过多只会降低系统的执行速度。
由于以上几点原因有了Executor,接下来看看Executor的好处。
先来看看几个类:Executors, Executor, ThreadPoolExecutor, ExecutorService, Future, Callable, Runnable, FutureTask,先理清这些类关系。
public interface Executor {
void execute(Runnable command);
}
Executor 接口是任务执行的主要抽象,将任务本身和执行任务分离,一个接收Runnable任务的execute方法。
public interface ExecutorService extends Executor
ExecutorService用来控制线程池的方法;如:shutdown:当调用 shutDown 方法时,线程池会停止接受新的任务,isShutdown, isTerminated...来判断线程池运行情况;他还提供了
<T> Future<T> submit(Callable<T> task);
该方法与Executor.execute一样代表任务执行,不过不同的是submit接收一个Callable而且有返回值,返回一个Future;
那么Callable是什么?
public interface Callable<V> {
V call() throws Exception;
}
Callable是Runnable的替代,它与Runnable一样是任务的抽象,跟Executor(任务执行的抽象)概念区分开,它有返回值,而且能抛异常,能抛异常我们就可以利用这一点来在任务执行过程中判断任务执行情况,是被取消了,或者是出错抛异常了,我们还可以实现他的超时抛异常功能主动通知调用放任务执行超时了;
那么Future?
Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:
public interface Future<V> {
(只有当你非常清楚线程的中断策略时,才应该为true)
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel():cancel()方法用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
- isCanceled():判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
- isDone():判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
- get():获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。
- get(long timeout,Timeunit unit):带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常
submit返回了Future,我们就可以利用Future来查看任务的状态isCancelled(是否被取消),isDone(是否完成),get阻塞直到任务完成返回任务执行结果;
FutureTask
FutureTask是Future是唯一实现类,它同时实现了Runnable
Exectors工厂类提供了线程池的初始化接口
ThreadPoolExecutor为一些Executor提供了基本实现,这些Executor是由Executors中的newCachedThreadPool, newFixedThreadPool和newScheduledThreadExecutor等工厂方法返回的。ThreadPoolExecutor是一个灵活稳定的线程池,允许进行各种定制。
Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
- 和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
参数
以newFixedThreadPool为例来说说各个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize核心线程数,每提交一个任务创建一个线程,直到线程数等于核心线程数,此后提交的任务会被放在阻塞队列中;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize最大线程数;如果阻塞队列满了,会创建线程执行新提交的任务,前提是当前线程数要小于maximumPoolSize
keepAliveTime线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
unitkeepAliveTime的单位
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory创建线程的工厂
handler线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
参考
《Java并发编程实战》
深入分析java线程池的实现原理
Executor, ExecutorService 和 Executors 间的不同