日期:2020/7/5
最近看并发源码,把自个理解整理记录,说实话......一天不看就得重头看,脑子可能比较笨,要反复琢磨理解,进度比较慢,还没整理研究完.......最终目的能自己手写一个线程池
线程池
一.线程池使用场景
1.单个任务处理时间比较短
2.需要处理的任务量很大
二.线程池优势
1.重用存在的线程,减少线程创建,消亡的开销,提高性能
2.提高响应速度,当任务到达时,任务可以不要等到线程创建就立即执行
3.提高线程的可管理性,线程是稀缺资源,如果无线创建,会消耗系统资源,降低系统稳定性,线程池可以进行统一分配,调优和调度
三.线程池分析
1.线程池的创建
//利用Executors工具类去创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
//参数:线程池数量,最大线程池数量,线程最大空闲存活时间,空闲时间计量单位,无空闲线程则存放到阻塞队列中
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
2.线程池的工作流程
线程池的大小 maximumPoolSize = 核心线程5+非核心线程5 = 10
当有任务进入的时候,会优先将任务分配到核心线程,其他剩余的任务放入到阻塞队列中去等待,如果此队列是一个有界队列,则直到放满为止,如果还有任务则创建非核心线程,如还有剩余任务没有分分配,则执行RejectedExecutionHandler拒绝策略
3.线程池的重点属性
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE 3; private static final int CAPACITY = (1 << COUNT_BITS) 1;
}
属性解析: ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两 部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这 里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常 量表示workerCount的上限值,大约是5亿。
ctl相关方法 :
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
runStateOf:获取运行状态; workerCountOf:获取活动线程数; ctlOf:获取运行状态和活动线程数的
4.线程池存在的5种状态
RUNNING = 1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011
1.RUNNING:
(1)状态说明:线程池处于RUNNING状态时,能够接收新任务,以及对已经添加的任务进行处理
(2)状态切换:线程池的初始化状态是RUNNING,话句话说,线程池一但被创建,就处于RUNNING状态,并且线程池种的任务书为0
2.SHUTDOWN:
(1)状态说明:线程池处在SHUTDOWN状态时,不接受新任务,但能处理已经添加的任务
(2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
3.STOP:
(1):状态说明:线程池处在STOP状态时,不接受新任务,不处理已添加的任务,并且会中断正在处理的任务
(2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN)-->STOP
4.TIDYING:
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING
状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在
ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;
可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也
为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的
任务为空时,就会由STOP -> TIDYING。
5.TERMINARTED:
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -
> TERMINATED。
ThreadPoolExecutor
1.线程池的创建:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释:
corePoolSize :
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当
前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到
阻塞队列中,等待被执行;
maximumPoolSize:
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线
程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime:
线程池维护线程所允许的空闲时间,当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立刻销毁,而是会等待,知道等待的时间超过了keepAliveTime
unit :
keepAliveTime的单位
workQueue:
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供
了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞
吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到
另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于
LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory:
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程
时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设
置了线程的名称。
handler:
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须提供一中策略处理该任务,一般提供了4中策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如
记录日志或持久化存储不能处理的任务。
1.任务提交(execute方法源码分析)
public void execute() //提交任务无返回值
public void execute(Runnable command) {
if (command ==null)
throw new NullPointerException();
int c =ctl.get();
/*
*/
if (workerCountOf(c)<corePoolSize){
if (addWorker(command, true))
return;
c =ctl.get();
}
//
if (isRunning(c) &&workQueue.offer(command)) {
int recheck =ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) ==0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
简单来说,在执行execute()方法时如果状态一直是RUNNING,执行过程如下:
1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任
务;
2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添
加到该阻塞队列中;
3. 如 果 workerCount >= corePoolSize && workerCount <
maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新
提交的任务;
4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根
据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为
任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中
获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是
为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
2.addWorker方法
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c =ctl.get();
int rs =runStateOf(c);
if (rs >=SHUTDOWN &&
! (rs ==SHUTDOWN &&
firstTask ==null &&
!workQueue.isEmpty()))
return false;
for (;;) {//这个自旋是来判断我们这个线程池是否可以创建我们的任务
int wc =workerCountOf(c);
if (wc >=CAPACITY ||
wc >= (core ?corePoolSize :maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))break retry;
c =ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted =false;
boolean workerAdded =false;
Worker w =null;
try {
w =new Worker(firstTask);
final Thread t = w.thread;
if (t !=null) {
final ReentrantLock mainLock =this.mainLock;
mainLock.lock();
try {
int rs =runStateOf(ctl.get());
if (rs<SHUTDOWN||
(rs ==SHUTDOWN && firstTask ==null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s =workers.size();
if (s >largestPoolSize)
largestPoolSize = s;
workerAdded =true;
}
}finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted =true;
}
}
}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.Worker类
线程池中的每一个线程被封装成一个Worker对象, ThreadPool维护的其实就是一组Worker对象
Worker继承了AQS,实现了Runnable接口,其中的firstTask和thread属性,firstTask用他来保存传入的任务,thread是在调用构造方法时通过ThreadFactory来创建的线程,用来处理任务的线程
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runable接口,也就是一个线程,所以Worker对象在启动的时候回调用Worker类中的run方法
4.RunWorker方法
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask =null;
w.unlock();
boolean completedAbruptly =true;
try {
while (task !=null || (task = getTask()) !=null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown =null;
try {
task.run();
}catch (RuntimeException x) {
thrown = x; throw x;
}catch (Error x) {
thrown = x; throw x;
}catch (Throwable x) {
thrown = x; throw new Error(x);
}finally {
afterExecute(task, thrown);
}
}finally {
task =null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly =false;
}finally {
processWorkerExit(w, completedAbruptly);
}
}
:
1. while循环不断地通过getTask()方法获取任务;
2. getTask()方法从阻塞队列中取任务;
3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是
中断状态;
4. 调用task.run()执行任务;
5. 如果task为null则跳出循环,执行processWorkerExit()方法;
6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给
子类来实现。
completedAbruptly 变 量 来 表 示 在 执 行 任 务 过 程 中 是 否 出 现 了 异 常 , 在
processWorkerExit方法中会对该变量的值进行判断。
5.getTask方法
getTask方法用来从阻塞队列中获取任务
private RunnablegetTask() {
boolean timedOut =false;
for (;;) {
int c =ctl.get();
int rs =runStateOf(c);
if (rs >=SHUTDOWN && (rs >=STOP ||workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc =workerCountOf(c);
boolean timed =allowCoreThreadTimeOut || wc >corePoolSize;
if ((wc >maximumPoolSize || (timed && timedOut))
&& (wc >1 ||workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r !=null)
return r;
//这里设置timedOut=true,会在第二次for循环中返回null,workerCount数量减1
timedOut =true;
}catch (InterruptedException retry) {
timedOut =false;
}
}
}
6.processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock =this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
}finally {
mainLock.unlock();
}
tryTerminate();
int c =ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min =allowCoreThreadTimeOut ?0 :corePoolSize;
if (min ==0 && !workQueue.isEmpty())
min =1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生
命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,
runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入
processWorkerExit方法,整个线程结束,如图所示: