线程池简单实现
package Thread;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolDemo {
//1.阻塞队列
private BlockQueue<Runnable> taskQueue;
//2.核心线程数
private int coreSize;
//3.获取任务的超时时间
private long timeout;
//4.时间转换
private TimeUnit timeUnit;
//5、线程集合
HashSet<Worker> workers = new HashSet<>();
//6.拒绝策略
private RejectPolicyDemo<Runnable> rejectPolicy;
public ThreadPoolDemo(int coreSize, long timeout, TimeUnit timeUnit , int queueCap , RejectPolicyDemo<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<Runnable>(queueCap);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//1.当task 不为空,执行任务
//2.当task 执行完毕,再接着从任务队列获取任务继续执行
// while(task != null || (task = taskQueue.take()) != null){ //该策略会死等 ,就算线程池为空,也会一直等待
while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null; //没有任务了 将task置为null
}
}
synchronized (workers){
workers.remove(this);
}
}
}
//执行任务
public void excute(Runnable task){
//如果任务数没有超过 coresize 时 ,直接交给worker对象执行
//如果超过了 coresize时 ,将任务加入到阻塞队列中
synchronized (workers){
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}
else {
//可选择阻塞队列满足后 选择拒绝策略
// taskQueue.put(task);
//1.死等
//2 带超时等待
//3 让调用者放弃任务
//4 让调用者抛出异常
//5 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy , task);
}
}
}
public static void main(String[] args) {
ThreadPoolDemo threadPool = new ThreadPoolDemo(2 , 1000 , TimeUnit.MILLISECONDS , 100 ,
(queue, task)->{
//1.死等
// queue.put(task);
//2 带超时等待
// queue.offer(task , 500 ,TimeUnit.MILLISECONDS );
//3 让调用者放弃任务
// System.out.println("放弃任务");
//4 让调用者抛出异常
// throw new RuntimeException("运行失败"+task); 抛出异常后 后续的任务不会再执行
//5 让调用者自己执行任务
task.run();
});
}
}
@FunctionalInterface
interface RejectPolicyDemo<T>{
void reject(BlockQueue<T> queue , T task);
}
class BlockQueue<T> {
public BlockQueue() {
}
public BlockQueue(int capcity) {
this.capcity = capcity;
}
// 1.阻塞队列
private Deque<T> queue = new ArrayDeque<>();
//2.锁
private ReentrantLock lock = new ReentrantLock();
//3. 生产者变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
//带超时的阻塞获取
public T poll(long timeout , TimeUnit unit){
lock.lock();
try {
//将timeout 统一转换成 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if(nanos <= 0){
//超时
return null;
}
//防止虚假唤醒,返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}
finally {
lock.unlock();
}
}
//阻塞获取
public T take(){
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}
finally {
lock.unlock();
}
}
//带超时时间的阻塞添加
public boolean offer(T task , long timeout , TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while(capcity == queue.size()){
try {
if(nanos <= 0){
return false;
}
fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element){
lock.lock();
try {
while(capcity == queue.size()){
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int size(){
lock.unlock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicyDemo<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if(queue.size() == capcity){
rejectPolicy.reject(this , task);
}else { //有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
源码
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor 类中的线程状态变量如下:
// Integer.SIZE 值为 32
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
状态名 | 高三位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 接收新任务,同时处理任务队列中的任务 |
SHUTDOWN | 000 | N | Y | 不接受新任务,但是处理任务队列中的任务 |
STOP | 001 | N | N | 中断正在执行的任务,同时抛弃阻塞队列中的任务 |
TIDYING | 010 | - | - | 任务执行完毕,活动线程为0时,即将进入终结阶段 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上面比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
- 使用一个数来表示两个值的主要原因时:可以通过一次cas同时更改两个属性的值
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造参数解释:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- maximumPoolSize - corePoolSize = 救急线程数
- keepAliveTime:救急线程空闲时的最大生存时间
- unit:时间单位
- workQueue:阻塞队列(存放任务)
- 有界阻塞队列 ArrayBlockingQueue
- 无界阻塞队列 LinkedBlockingQueue
- 最多只有一个同步元素的队列 SynchronousQueue
- 优先队列 PriorityBlockingQueue
- threadFactory:线程工厂(给线程取名字)
- handler:拒绝策略
救急线程在核心线程和阻塞队列都放不下了才会使用
工作方式:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排 队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急。
- 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前 4 种实现,其它著名框架也提供了实现
- ThreadPoolExecutor.AbortPolicy 让调用者抛出RejectedExecutionException 异常,这是默认策略
- ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
- ThreadPoolExecutor.DiscardPolicy 放弃本次任务
- ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
-
当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
总览
ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
我们经常会使用Executors
这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这两个方法都会进行使用ThreadPoolExecutor
来创建一个ThreadPoolExecutor实例(具体可见前面构造方法)
Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(即使只有 29 位,也已经不小了,大概 5 亿多,现在还没有哪个机器能起这么多线程的吧)。
核心方法 execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
// 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
// 至于执行的结果,到时候会包装到 FutureTask 中。
// 返回 false 代表线程池不允许提交任务
if (addWorker(command, true))
return;
// 前面说的那个表示 “线程池状态” 和 “线程数” 的整数
c = ctl.get();
}
// 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
// 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支
// 以 maximumPoolSize 为界创建新的 worker,
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
四个拒绝策略的具体实现
// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 这个相对霸道一点,如果线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Executor生成不一样的连接池
- 生成一个固定大小的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。
过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。
- 生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中。
总结
- java线程池七大属性
corePoolSize,
maximumPoolSize,
workQueue,
keepAliveTime,
unit,
threadFactory,
rejectedExecutionHandler
- 线程池中的线程创建时机
- 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
- 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
- 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
- 任务执行过程中发生异处理
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
- 执行拒绝策略的时机
- workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
- workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。