Android中,系统为我们提供了4种标准线程池:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
- ScheduledThreadPool
但是,需求是无止境的,我们总是会有一些需求,4种线程池都不能非常完美的满足到。所以,我们需要自己配置线程池。不难发现,4个标准线程池都是由ThreadPoolExecutor
配置不同的参数生成的,所以我们通过阅读一下ThreadPoolExecutor
的源码来学习如何建立自己的线程池。
有意思的是,ThreadPoolExecutor
类代码总共2000行,注释就占了大概有1000行。因此,我们只需要认真地阅读它的注释,就可以慢慢了解它的工作原理。
我们知道创建和销毁线程的实例都是代价比较大的操作。当我们开发中,需要执行大量后台任务是,我们需要大量的线程。此时,为了尽可能的减少开销,我们尝试将使用过的线程不再销毁而是停掉它保存在内存中,等到其他任务需要使用后台线程时,再将它拿出来用,这样就避免了一部分的线程的创建和销毁的过程,这就需要用到线程池。
为了弄懂Android为我们提供的4种标准线程池在使用上有什么区别,我们首先要理清几个概念:
核心线程数和最大线程数
在线程池中,corePoolSize
,maximumPoolSize
,工作队列的长度共同决定了:
- 当我有一个新任务时,如果工作中的线程,少于核心线程(corePoolSize)。无论有没有闲置的线程都会创建一个线程在处理请求。
- 当我有一个新任务时,如果工作中的线程,大于等于核心线程(corePoolSize),且小于最大线程(maximumPoolSize),且工作队列未满,则提交任务到工作队列等待。
- 当我有一个新任务时,如果工作中的线程,大于等于核心线程(corePoolSize),且小于最大线程(maximumPoolSize),且工作队列已满,则开启非核心线程
- 当我有一个新任务时,如果工作中的线程,大于等于最大线程(maximumPoolSize)时,则拒绝线程请求。
这里可能比较难理解,我们用一个现实生活中的场景来比喻一下。比如我们去银行取钱,银行一开始最多只会开4个核心柜台,即核心线程数。即使柜台闲着了,也不会关掉。
当需要取钱的人数,超过4人时,就需要开始排队了(即工作队列)。如果人数再增多,队伍都排满了,银行会打开临时柜台(非核心线程)。临时柜台与核心柜台不同,如果没人排队了,就会关掉。但是临时柜台也是有限的,如果超过临时柜台的上限(maximumPoolSize),银行就会关门了(拒绝线程请求)。
默认情况下,核心线程只有在有新任务来时,才会被创建出来。但我们也可以重写prestartCoreThread
和prestartAllCoreThreads
。比如,如果希望在创建线程池时就把所有的线程创建好,那就需要重写这两个方法了。
创建新的线程
创建新线程,使用ThreadFactory
方法。如果没有特指,ThreadPoolExecutor
会使用defaultThreadFactory()
。用这个方法创建的线程,所有的线程会处在相同的ThreadGroup中,并且拥有相同的线程优先级NORM_PRIORITY
和相同的线程状态——非守护状态。
通过应用不同的的ThreadFactory
,你可以自定义线程的名字、线程组、守护状态等等。如果ThreadFactory
创建线程失败返回了null,executor
将会持续,但是可能不会再执行任何线程。
Keep-alive times
如果线程池中含有数量超过核心线程数(corePoolSize)的线程,多余的线程如果空闲时间超过了Keep-alive times就会被终止掉。
BlockingQueue
在线程池中BlockingQueue有三种排队策略。
直接切换
一种好的默认选择SynchronousQueue
将任务交给线程,但是不保留它们。也就是说,如果核心线程数(corePoolSize)已满,则不会在队列中等待,会直接开新的临时线程。这个策略的好处是,不会引起互锁。直接切换,需要没有边界的最大线程数去避免新线程的创建。这也反过来承认了,如果任务的到达速度超过了它的处理速度,临时线程的数量可能会无限增长。
无边界队列(LinkedBlockingQueue)
用无边界队列,当核心线程被占满时,任务一定会在队列中进行排队。因此,不会有额外的线程创建。这个适用于线程之间互不影响,互相没有依赖的情况。例如Web页的服务器中。这种方式可以处理瞬态突发请求。同时,这个也会出现任务的到达速度超过了它的处理速度的情况,这个队列的长度可能会无限增长。
有边界队列(ArrayBlockingQueue)
有边界的队列在我们使用有限的最大线程数时,可以帮助我们避免资源的浪费,但是这也表示,它非常难以协调和控制。队列的长度和最大线程的数量可以互相交换:用大的队列长度,小的最大线程数,可以减少CPU使用、系统资源消耗和上下文切换开销,但这会导致人为的低效率。如果任务频繁阻塞,系统可能能够为更多的任务安排时间,除非你允许。如果用较小的队列长度,通常就需要较大的最大线程数。这样做,可以保持CPU更忙碌,但同时,这也会遇到不可接受的调度,而造成额外的线程开销。因此也有可能降低效率。
拒绝任务
当新任务用execute
提交时,可能会被拒绝。被拒绝有以下几种情况:
-
Executor
已经被关闭 -
Executor
使用了有限的等待队列与最大线程数,并且它们饱和了
在这些情况下,RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
会被调起。这里Android提供了4种预定义的拒绝策略。
ThreadPoolExecutor.AbortPolicy
这个是默认策略,它会抛出一个异常RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
这个策略会让调用execute的线程自己执行这个任务。这提供了一种简单的反馈控制机制,其将降低提交新任务的速率。
我们可以看一下它的源码,非常简单:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
ThreadPoolExecutor.DiscardPolicy
这个策略会将不能执行的任务,简单地抛弃。
源码中就是什么也不做:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
ThreadPoolExecutor.DiscardOldestPolicy
这个策略如果线程池没有关闭,线程池会丢掉队列头部的元素。然后任务再次请求。如果还不行,再丢掉头部,也就是说,这个过程会重复直到成功为止。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
同时, 我们也可以去使用自定义的RejectedExecutionHandler
。如果拒绝策略被设定在只在特定容量和排队策略下生效,需要开发者格外谨慎。
钩子(Hook methods)
这个类提供了可以重写的方法
beforeExecute
,afterExecute
会在每个任务的调用前和调用后进行调用。这个方法可以控制任务的执行环境。比如,重新初始化ThreadLocals
,收集统计信息,或是添加Log信息。此外,terminated
可以被重写,在线程池完全终止时执行一些特殊操作。
如果钩子或回调方法抛出异常,内部工作线程可能反过来失败并突然终止。
队列维护
getQueue
方法可以用于访问工作中的等待队列,用于监听和调试。除此之外,为别的目的使用这个方法强烈不推荐。当有大量排队的任务将要被取消时,remove(Runnable )
和purge
两个方法可用于协助回收储存。
最终
一个线程池,如果不再被引用,且其中没有其他线程,将会被自动关闭。如果你想确保,即使用户没有调用shutdown
未被引用的线程池依然能正确地关闭,那么,你必须安排那些没有用过的最终会被关闭。为了达到这个目的,你可以设置一个大概的keep-alive
时间,用下限为0的核心线程数,或者设置allowCoreThreadTimeOut
,允许核心线程会终止。
扩展实例
大部分关于ThreadPoolExecutor的实例重写了一个或多个方法。比如,这里有一个小例子添加了简单的暂停和继续功能。
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
}
上面的代码可以看到,我们用一个Condition
unpaused在调用pause
方法后让线程进入闲置状态。调用resume
方法时让线程再次被唤醒。我们可以看到,所有方法在进入时都有加锁,那么beforeExecute
被锁定后,resume
方法如何调用成功的呢?
这里需要补充一些知识。ReetrantLock的锁,在Conditon
调用了await()后,就不再持有锁了。任何线程都可以进入。所以我们在这里调resume时再次加锁,ReetranlLock的锁会+1。
以上,谢谢阅读。