大概用Executors直接生成ThreadPoolExecutor的习惯会被认为理所当然,但是在阿里的java开发规范中是禁止的,开发者必须使用ThreadPoolExecutor。其实没有什么好不好,Executors没法满足所有场景罢了。
先贴一张网上找来的图介绍下线程池运行线程的流程
ThreadPoolExecutor
司空见惯,ThreadPoolExecutor也有多个构造方法,并且都指向参数最多的一个:
下面贴出这个构造方法的代码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
逐一解释
corePoolSize
核心线程数,当线程池初始化以后,不会立即创建所有的核心线程(除非主动调用了prestartAllCoreThreads()),而是有一个任务进来,如果线程数还没有达到corePoolSize,那么就选择新建一个线程,如果达到核心线程数,则放入阻塞队列中。
maximumPoolSize
因为corePoolSize的设置,溢出任务会被放入阻塞队列中,当阻塞队列满了,同时又有新任务进来,那么线程池会进行扩容,但是总的线程数量不会超过maximumPoolSize。
keepAliveTime与unit
keepAliveTime代表当线程空闲以后(前提是之前发生过阻塞队列满,并且增加了线程数量的情况),还能继续存活的时间,unit是时间单位,当时间超过keepAliveTime并且线程处于空闲状态,则被回收,回收的数量为:当前线程数 - corePoolSize。
workQueue
当前已有workQueue个线程在执行任务无法空闲,此时如果新加入任务,则放入workQueue(阻塞队列中),下面介绍几种阻塞队列。
队列 | 说明 |
---|---|
ArrayBlockingQueue | 基于数组结构的有界阻塞队列,按FIFO排序任务 |
LinkedBlockingQuene | 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene |
SynchronousQuene | 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene |
priorityBlockingQuene | priorityBlockingQuene:具有优先级的无界阻塞队列 |
threadFactory
线程工厂,主要对线程进行一些装饰,如我需要对我的线程有特殊的命名:
static class TestThreadFactory implements ThreadFactory {
private AtomicInteger threadNum = new AtomicInteger(0);
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("test-thread-pool-" + threadNum.incrementAndGet());
return t;
}
}
handler
当线程池无法承受任务数量(当前线程数量 = maximumPoolSize),再有任务进来,将会采取拒绝策略,java提供了四种对超额任务的拒绝处理
队列 | 说明 |
---|---|
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常 |
ThreadPoolExecutor.DiscardPolicy | 也是丢弃任务,但是不抛出异常 |
ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) |
ThreadPoolExecutor.CallerRunsPolicy | 由调用线程处理该任务 |
一个简单的例子
public class test {
public static void main(String[] args){
TestThreadFactory testThreadFactory = new TestThreadFactory();
// 默认策略,直接抛出异常
RejectedExecutionHandler abortPolicyHandler = new ThreadPoolExecutor.AbortPolicy();
// 丢弃任务,不抛出异常
RejectedExecutionHandler discardPolicyHandler = new ThreadPoolExecutor.DiscardPolicy();
// 丢弃队列最前面的任务,重新尝试执行任务
RejectedExecutionHandler discardOldestPolicyHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
// 由调用线程处处理该任务-移交给主线程执行(谨慎使用)
RejectedExecutionHandler CallerRunsPolicyHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,
10,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(
1),
testThreadFactory,
CallerRunsPolicyHandler);
System.out.println(threadPoolExecutor.getPoolSize());
for (int i = 0; i <= 15; i++){
threadPoolExecutor.execute(() -> {
System.out.println("hello Jules, I am [" + Thread.currentThread().getName() + "]");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadPoolExecutor.getPoolSize());
}
/**
* 利用TestThreadFactory修改线程名称
*/
static class TestThreadFactory implements ThreadFactory {
private AtomicInteger threadNum = new AtomicInteger(0);
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("test-thread-pool-" + threadNum.incrementAndGet());
return t;
}
}
}
结果:
采用了CallerRunsPolicyHandler策略,所以溢出线程由调用方(main)处理。
欢迎关注公众号交流,定期分享源码心得