Executor框架
在了解这个框架之前,首先看看这个框架有哪些相关的主要类
- Executor
public interface Executor {
void execute(Runnable command);
}
- ExecutorService
扩展Executor接口,增加了submit task的方式,�提供停止线程执行的方法shutdown。
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
...
}
- AbstractExecutorService
抽象类,ExecutorService的子类,使用FutureTask实现了submit,是众多线程池实现类的基类,但execute相关代码未实现
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
...
}
- Callable
public interface Callable<V> {
V call() throws Exception;
}
- Future
这个接口弥补了Thread的缺陷,因为Thread不能对自身进行控制,也不能获得线程执行的结果,而这个接口可以获取当前状态,控制结束,同时get返回值
public interface Future<V> {
boolean isDone();
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
...
}
- RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
- FutureTask
Future接口实现类,传入runnable通过适配器模式转换成callable,注意awaitDone方法,是通过阻塞线程的方式获取返回结果。
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
...
}
ThreadPoolExecutor
AbstractExecutorService的子类,拥有execute方法,构造器如下:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 指的是保留的线程池大小。
maximumPoolSize 指的是线程池的最大大小。
keepAliveTime 指的是空闲线程结束的超时时间。
unit 是一个枚举,表示 keepAliveTime 的单位。
workQueue 表示存放任务的队列。
threadFactory 表示创建Thread的工厂
handler 表示当工作队列满时的处理方法:
- CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程,由用户线程执行而不是由线程池执行
- DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去.
- DiscardPolicy:直接抛弃任务
- AbortPolicy(默认):抛出一个RejectedExecutionException异常
举个栗子:
假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
execute方法源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
2、当一个线程完成任务时,它会从队列中取下一个任务来执行。
3、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
workers.add(w);
t.start();
}
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //如果getTask()返回null,线程退出
w.lock();
try {
beforeExecute(wt, task); //用户可以定义执行前的操作
Throwable thrown = null;
try {
task.run(); //执行用户的业务逻辑
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //用户可以定义执行后的清理操作
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
addWorker方法通过判断线程池中线程的数量是否达到corePoolSize来判断是否需要新建一个Worker,如果超过corePoolSize,则把task放到队列中。
一旦一个Worker创建,相当于一个线程被start,异步发生在Worker类的run方法中,因为getThreadFactory().newThread(this)传入的是当前Worker(runnable的实现类)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 重点
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask是线程阻塞的方法,所以只要队列里有任务,就会循环执行,最后当超过keepAliveTime还没有新的task进入,则线程结束。
简单点可以这样理解,也许有一百个任务需要执行,但只创建了有限个线程进行处理,从任务队列里不停的取出任务,直到将所有的任务都取光,线程结束
Executors
更推荐使用Executors的方法创建和使用线程池。
Executors提供四种线程池,分别为:
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中,终止并从缓存中移除那些已有60 秒钟未被使用的线程。
- 先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建一个新的线程加入池中。
- 通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的daemon型Server中用得不多,但对于生存期短的异步任务,它是Executor的首选。
- 能reuse的线程,必须是timeout内的池中线程,缺省timeout是60s,超过这个时长,线程实例将被终止及移出池。
corePoolSize为0,maximumPoolSize为MAX_VALUE,没有限制创建线程的数量,但却不停复用已有线程,有未终止的线程存在,新任务就会进入队列等待。超时时间为60s,意味着线程缓存时间为60s,60s内有新的任务都可以复用现有线程。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。
- 和 cacheThreadPool 不同,FixedThreadPool没有 IDLE 机制,所以 FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。
corePoolSize和maximumPoolSize的值限制并且相同,限制了最多可能存在的线程数,会有多个线程同时处理任务,但是它的timeout为0,意味着一旦线程结束就会被销毁,如果是短时任务,而任务的频率又不是高并发的前提下,会导致不停的创建和销毁线程,导致资源消耗。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
创建一个定长线程池,支持定时及周期性任务执行。
- 这个池子里的线程可以按 schedule 依次 delay 执行,或周期执行。
- ScheduledExecutorService比Timer更安全,功能更强大。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
- 单例线程,任意时间池中只能有一个线程。
- FinalizableDelegatedExecutorService是个代理ExecutorService,只暴露ExecutorService的方法。
- 跟newFixedThreadPool(1)是一样的。