1. 线程池的概念
1.1 基本概念
由于线程的生命周期中包括创建、就绪、运行、阻塞、销毁阶段,当我们待处理的任务数目较小时,我们可以自己创建几个线程来处理相应的任务,但当有大量的任务时,由于创建、销毁线程需要很大的开销,运用线程池这些问题就大大的缓解了。
1.2 使用线程池的好处
1.2.1 使用new Thread()创建线程的弊端
每次通过new Thread()创建对象性能不佳。
线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
缺乏更多功能,如定时执行、定期执行、线程中断。
1.2.2 使用Java线程池的好处
重用存在的线程,减少对象创建、消亡的开销,提升性能。
可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
提供定时执行、定期执行、单线程、并发数控制等功能。
2. Java 中有哪几种线程池
2.1 CachedThreadPool
优点:
工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
缺点:
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
2.2 FixedThreadPool
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。定长线程池的大小最好根据系统资源进行设置如Runtime.getRuntime().availableProcessors()
优点:
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。
缺点:
但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
2.3 SingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
2.4 ScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
3. 如何实现自定义的线程池
3.1 线程池具体实现解析
当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到,线程池的具体实现是调用 ThreadPoolExecutor 的构造方法实现的,那么下面来具体看看 ThreadPoolExecutor 构造方法的参数。
3.2 ThreadPoolExecutor 参数详解
先来看看 ThreadPoolExecutor 的构造方法:
通过查看 ThreadPoolExecutor 源码可以看到,该类有四个构造方法,头三个构造方法,其实都是调用的第四个构造方法,所以我们就解释一下第四个构造方法的参数含义。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程池的大小,在线程池被创建之后,其实里面是没有线程的。(当然,调用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法会预创建线程,而不用等着任务的到来)。当有任务进来的时候,才会创建线程。当线程池中的线程数量达到corePoolSize之后,就把任务放到 缓存队列当中。(就是 workQueue )。
maximumPoolSize:最大线程数量是多少。它标志着这个线程池的最大线程数量。如果没有最大数量,当创建的线程数量达到了 某个极限值,到最后内存肯定就爆掉了。
keepAliveTime:当线程没有任务时,最多保持的时间,超过这个时间就被终止了。默认情况下,只有 线程池中线程数量 大于 corePoolSize 时,keepAliveTime 值才会起作用。也就说说,只有在线程池线程数量超出 corePoolSize 了。我们才会把超时的空闲线程给停止掉。否则就保持线程池中有 corePoolSize 个线程就可以了。默认值是60秒。
Unit:参数keepAliveTime的时间单位,就是 TimeUnit类当中的几个属性。
-
workQueue:用来存储待执行任务的队列,不同的线程池它的队列实现方式不同(因为这关系到排队策略的问题)比如有以下几种:
ArrayBlockingQueue:基于数组的队列,创建时需要指定大小。
LinkedBlockingQueue:基于链表的队列,如果没有指定大小,则默认值是 Integer.MAX_VALUE。(newFixedThreadPool和newSingleThreadExecutor使用的就是这种队列),吞吐量通常要高于ArrayBlockingQuene。
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene(newCachedThreadPool使用的就是这种队列)。
threadFactory:线程工厂,用来创建线程。通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
-
Handler:拒绝执行任务时的策略,一般来讲有以下四种策略:
ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。
ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用 execute 方法的线程执行该任务。
ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。
ThreadPoolExecutor.DiscardPolicy,丢弃任务,不过也不抛出异常。
3.3 线程池的处理流程
下图是提交任务给线程池之后, 线程池的处理流程图
如果当前线程池线程数目小于 corePoolSize(核心池还没满呢),那么就创建一个新线程去处理任务。
如果核心池已经满了,来了一个新的任务后,会尝试将其添加到任务队列中,如果成功,则等待空闲线程将其从队列中取出并且执行,如果队列已经满了,则继续下一步。
此时,如果线程池线程数量 小于 maximumPoolSize,则创建一个新线程执行任务,否则,那就说明线程池到了最大饱和能力了,没办法再处理了,此时就按照拒绝策略来处理。(就是构造函数当中的 Handler 对象)。
如果线程池的线程数量大于 corePoolSize,则当某个线程的空闲时间超过了 keepAliveTime,那么这个线程就要被销毁了,直到线程池中线程数量不大于 corePoolSize 为止。
4. 线程池的实现原理
4.1 线程池的处理流程
一个线程从被提交(submit)到执行共经历以下流程:
判断核心线程池中的线程是否都在执行任务,如果不是则新建一个工作线程执行任务,如果都在执行任务则进入到第二个流程
判断工作队列是否已满,如果工作队列没有满,则新提交的任务存储在工作队列中,如果满了则进入到第三个流程
判断线程池内部的线程是否都处于工作状态,如果不是则新建一个工作线程执行任务,如果都在执行任务,则交给任务饱和度策略来处理这个任务。
线程池在执行excute方法时,主要有以下四种情况:
如果当前线程数少于 coolPoolSize ,那么新建一个工作线程执行任务。(需要获得全局锁)
如果当前线程数等于或大于 coolPoolSize , 那么将创建的线程放入到 BlockQueue 中。
如果如果无法将任务放入到 BlockQueue(已满),那么新建一个线程执行任务。(需要获得全局锁)
如果新建的线程数大于了当前线程池的 maxiumPoolSize , 那么任务会被拒绝,交给 RejectedExecutionHandler.rejectedExecution() 方法处理。
4.2 ThreadPoolExecutor 源码分析
4.2.1 定义的几个变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
在分析源码前有必要理解一个变量 ctl 。这是 Java 大神们为了把工作线程数量和线程池状态放在一个int类型变量里存储而设置的一个原子类型的变量。 在 ctl 中,低位的 29 位表示工作线程的数量,高位用来表示 RUNNING、SHUTDOWN、STOP 等状态。 因此一个线程池的数量也就变成了 (2^29)-1 ,大约 500 million ,而不是 (2^31)-1 , 2billion 。上面定义的三个方法只是为了计算得到线程池的状态和工作线程的数量。
4.2.2 Execute 方法
public void execute(Runnable command) {
//如果提交了空的任务 抛出异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取当前线程池的状态
//检查当前工作线程数量是否小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
//通过addWorker方法提交任务
if (addWorker(command, true))
return;
c = ctl.get();//如果提交失败 需要二次检查状态
}
//向工作线程提交任务
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//扩容失败 则拒绝任务
else if (!addWorker(command, false))
reject(command);
}
从源码中可以看到提交任务的这一过程基本与第二个图的四个流程是一致的,需要检查的是当前工作线程的数量与核心线程数量的关系,来决定提交任务的方式或者是拒绝任务提交。而具体任务的提交工作是在addWorker方法中。在这里面看到了recheck这样的变量,这是在执行了一些动作失败后再次检查线程池的状态,因为在这期间可能有线程池关闭获得线程池饱和等状态的改变。
4.2.3 addWorker 方法
这个方法是任务提交的一个核心方法。在里面完成了状态检查、新建任务、执行任务等一系列动作。它有两个参数,第一个参数是提交的任务,第二个参数是一个标识符,标识在检查工作线程数量的时候是应该与 corePoolSize 对比还是应该 maximumPoolSize 对比。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//死循环更新状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取运行状态
//检查线程池是否处于关闭状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取当前工作线程数量
int wc = workerCountOf(c);
//如果已经超过corePoolSize获取maximumPoolSize 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加一个工作线程
if (compareAndIncrementWorkerCount(c))
break retry;
//再次获取状态
c = ctl.get(); // Re-read ctl
//如果状态更新失败 则循环更新
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//初始化一个工作线程
final Thread t = w.thread;
if (t != null) {
//获得锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // SHUTDOWN以后的状态和SHUTDOWN状态下firstTask为null,不可新增线程
throw new IllegalThreadStateException();
workers.add(w); //添加工作这到hashset中保存
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //记录最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//工作线程启动 执行第一个任务 就是新提交的任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这个方法可以分为两个阶段来看,第一个阶段是判断是否有必要新增一个工作线程,如果有则利用CAS更新工作线程的数量;第二部分是将提交的任务封装成一个工作线程Worker然后加入到线程池的容器中,开始执行新提交的任务。这个Worker在执行完任务后,还会循环地获取工作队列里的任务来执行。下面来看一下Worker的构造方法就能更好地理解上面的代码了
4.2.4 runWorker 方法
在addWorker方法快要结束的地方,调用了t.start()方法,我们知道它实际执行的就是Worker对象的run()方法,而worker的run()方法是这样定义的:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
它实际上是将自己委托给线程池的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//不断地从blockingQueue获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行beforeExecute方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用Runable的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行aferExecute方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这个方法呢也比较好理解,它在不断执行我们提交的任务的run方法。而这个任务可能是我们新提交的,也有可能是从等待队列中获取的。这样就实现了线程池的完整逻辑。
4.2.5 shutdown,shutdownNow 方法
shutdown 方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //这个方法校验线程访问许可,不是很理解,后面有时间再单独解析;
advanceRunState(SHUTDOWN); //转换线程池状态为SHUTDOWN
interruptIdleWorkers(); //中断所有空闲的线程
onShutdown(); // 空实现方法,是做shutdown清理操作的
} finally {
mainLock.unlock();
}
tryTerminate(); //尝试结束线程池(设置状态为TERMINATED)
}
shutdownNow 方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//同上
advanceRunState(STOP);//转换线程池状态到STOP
interruptWorkers();//中断所有线程
tasks = drainQueue();//获取到任务队列所有任务,并清空队列
} finally {
mainLock.unlock();
}
tryTerminate();//同上
return tasks;
}
由上可知,两个关闭方法的区别:
shutdown 设置状态为 SHUTDOWN,而 shutdownNow 设置状态为 STOP
shutdown 值中断空闲的线程,已提交的任务可以继续被执行,而 shutdownNow 中断所有线程
shutdown 无返回值,shutdownNow 返回任务队列中还未执行的任务
虽然有 shutdown 和 shutdownNow 方法,但是还是不能满足一个需求:就是需要知道等待所有任务已完成线程池结束,这里 ThreadPoolExecutor 提供了 awaitTermination 方法满足这个需求:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
这个方法两个入参,设置等待超时时间。
如果状态已经是 TERMINATED 返回 true ,表示已关闭。
否则一直等到 termination 的 signalAll 至超时或者当前线程中断。超时后都线程池都没有关闭,返回 false。
5. 代码练习
5.1 当线程池满后处理剩余任务的 handler
ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用 execute 方法的线程执行该任务。
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
exec.execute(new Task());
}
exec.shutdown();
}
}
class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " >>> start.");
// mock run for a while
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:可以看到有两个线程交给了 main 方法处理。
pool-1-thread-1 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-3 >>> start.
main >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
main >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-3 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-1 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-1 >>> start.
Process finished with exit code 0
5.2 自定义线程池和线程工厂
5.2.1 自定义线程池,实现计时和统计功能
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
private final AtomicLong numTasks = new AtomicLong(1);
private final AtomicLong totalTime = new AtomicLong();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
/**
* 任务执行前
*/
protected void beforeExecute(Thread t,Runnable r){
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s",t,r));
startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
}
/**
* 任务执行后
* @param r 任务
* @param t 执行任务的线程
*/
protected void afterExecutor(Runnable r,Throwable t){
try {
Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
Long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated () {
try {
log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
5.2.2 自定义线程工厂,自定义线程名称
class MyThreadFactory implements ThreadFactory {
private String name;
private int counter;
public MyThreadFactory(String name) {
this.name = name;
this.counter = 1;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("[Task ID + " + (counter++) + ": " + name + "]");
return t;
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new MyThreadFactory("Test Thread"), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
exec.execute(new Task());
}
exec.shutdown();
}
}