知其然而知所以然
Java线程池早已称为面试官必考的知识,作为被面试的你想必不想被这个简单的问题难倒吧,来一起学习探索吧~
线程池是什么?
通俗可以理解为一个维护线程的池子。
为什么需要线程池?为什么不直接创建使用线程?
进程需要执行任务时,会将任务交给线程来处理,所以我们需要创建一个线程,处理完任务后,线程需要销毁,完成一个线程的生命周期。 如果需要执行多个任务,那么就需要创建多个线程去执行,由于创建线程和销毁的过程比较耗时,所以我们可以用一个池子来维护这些线程,需要的时候去池子里拿,用完了还给池子,线程不会销毁,省去了线程重复创建和销毁的耗时操作。
基本使用
Executors
JDK为我们提供了Executors工具来创建和使用线程池,我们可以通过以下方法来创建不同的线程池:
- newFixedThreadPool
可以设置固定大小线程的线程池,也是就核心线程数是固定的
/**
* 创建一个固定大小的线程池
* @param nThreads 线程个数
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor
/**
* 创建只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
线程缓存器,因为线程池没有初始核心线程,同时使用的是SynchronousQueue阻塞队列,所以在有任务处理的时候才会创建线程,线程空闲后还能存活60秒
/**
* 创建一个类似缓存的线程池
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
细心的我们发现以上三个方法创建线程池最终都会通过ThreadPoolExecutor的构造方法来创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
所以搞清楚这个构造函数每个参数的作用非常重要。
实现原理
1. 原理流程
线程池的组成
上图可以看到线程池的主要组成,在使用过程中我们也是通过调整线程数和队列以及其他的参数来定制符合我们场景的线程池。
核心线程数:线程池创建成功之后会创建核心线程数,随着线程池的关闭时销毁,伴随着线程池的整个生命周期。
非核心线程数:核心线程数和队列资源不足时会创建非核心线程数,执行完任务一定时间之后会被销毁。
最大线程数:最大线程数 = 核心线程数 + 非核心线程数。
阻塞队列:当核心线程不足以执行任务时,会将任务先丢到阻塞队列里等待,起到缓冲的作用。
执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
以下流程图解读了以上代码执行的流程,这也是线程池执行流程的核心
生命周期
状态 | 描述 |
---|---|
RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 关闭状态,不在接受新的任务,但却可以继续处理阻塞队列中的任务 |
STOP | 关闭状态,不在接受新的任务,也不在继续处理队列中的任务,中断正在处理任务的线程 |
TIDYING | 所有的任务都已经终止,workCount(有效线程数)为0 |
TERMINATED | 在terminated( ) 方法执行完后进入该状态 |
2. 参数介绍
以下为创建线程池最终的构造方法,我们想创建一个符合业务的线程池,就需要对下列的参数了如指掌。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize 核心线程数
maximumPoolSize 线程池最大线程数
keepAliveTime 非核心线程数非活跃时存活的时间
unit keepAliveTime值的单位
workQueue 阻塞队列
名称 | 说明 |
---|---|
ArrayBlockingQueue | 有界阻塞消息队列,创建时指定队列大小,创建后不能修改 |
LinkedBlockingQueue | 无界阻塞队列,其实也是有界队列,最大值为Integer.MAX |
PriorityBlockingQueue | 优先级阻塞队列 |
SynchronousQueue | 只有一个容量的阻塞队列 |
threadFactory 线程工厂
handler 拒绝策略
- AbortPolicy(默认策略) 不在接受新的任务,当新的任务来时总是抛异常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy 由调用线程去执行任务
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy 默默的将任务丢弃,啥也不做
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy 丢弃队列中长时间未处理的任务(靠近表头的任务),并尝试将新的任务放入队列中
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
阅读到这,你对线程池一定有深刻的印象,接下来我们看一下实际开发正确使用的姿势。
实际应用
对于新手来说,使用线程池的方式可能如下:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
int finalI = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(finalI);
}
});
}
}
我们通过Executors工具创建一个固定大小为10的线程池,使用的是无界的LinkedBlockingQueue阻塞队列,以上代码看起来没有任何问题,很快执行结束。
我们稍微改动一下:
新增sleep模拟耗时的操作。
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final int[] i = {0};
while (true){
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i[0]++);
}
});
}
}
以上代码一直执行,最终会出现产生内存溢出的问题。
因为newFixedThreadPool使用的是LinkedBlockingQueue(无界阻塞队列),当线程池源源不断的接受新的任务,自己出来不过来的时候,会将任务暂存到队列里,由于处理耗时,所以阻塞队列里的数据会越来越多,最终把内存打爆。
根据阿里的代码规范,不允许通过Executors来创建线程池,必须通过new ThreadPoolExcutor构造方法来创建线程池,如果你的Idea安装了阿里的代码检查插件,会看到如下提示:
所以正确的姿势是:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor. DiscardPolicy());
while (true){
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("running");
}
});
}
}
我们创建了核心线程数为10,且最大线程数为10,阻塞队列容量为100,拒绝策略为丢弃新的任务 的线程池,实际使用时还得需要根据业务场景和部署的机器来设置线程数。
不推荐使用Executors来创建线程池,通过ThreadPoolExecutor来创建。
题外话
当一个服务里有一些比较耗时的操作,比如批量导入、导出等耗时的IO操作,我们需要将这些操作的线程池和其他业务的线程池区分开,否则我们在进行导入导出操作时,会影响到其他的业务。所以我们会定制多个不同的线程池来处理不同类型的任务。
以上为个人的理解,如果差错,烦请不吝啬指出。