1.Java中的ThreadPoolExecutor类
1)Java中的ThreadPoolExecutor类是线程池最核心的类:
ThreadPoolExecutor有四个构造方法:
2) 下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
我们接着看ExecutorService接口的实现:
ExecutorService提供了终止线程池的管理方法,并且还提供了一些返回一个Future对象的方法,通过Future对象,我们就可以跟踪到异步任务的进程了。
一个ExecutorService是可以被关闭的,如果ExecutorService被关闭了,它将会拒绝接收新的任务。有两个不同的方法可以关闭ExecutorService
shutdown() 允许先前提交的任务在终止之前执行。
shutdownNow() 会阻止开启新的任务并且尝试停止当前正在执行的任务。
当一个线程池被终止时,没有正在执行的任务,也没有等待执行的任务,也没有新的任务可以提交,一个没有被使用的池程池应该关闭以允许回收它的资源。
submit()方法扩展了Executor#execute(Runnable)方法,创建被返回一个Future对象,这个对象可以用于取消任务的执行或者等待任务完成并取出返回值,至于如何取消任务,或者取值,大家可以参考一些对Future接口的使用案例,这里就不扩展了。
ThreadPoolExecutor线程池
根据treadpollexecutor构造方法种的中的参数进行 进行创建
intmaximumPoolSize,
longkeepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
这些参数进行创建其中keepAliveTime的时间单位由unit参数控制,必须>0,然后maximumPoolSize>corePoolSize>0,任务队列,线程工厂,拒绝策略均不能为null。如果在使用了其它的构造函数,可以会使用默认的的线程工厂和默认的拒绝策略
其中根据阿里巴巴java开发手册规范不允许使用executor创建 因为会使系统产生oom危险。具体原因 请自行查找。
execute()方法中:
* 1:如果当前运行的线程数小于 corePoolSize,则马上尝试使用command对象创建一个新线程。
* 调用addWorker()方法进行原子性检查runState和workerCount,然后通过返回false来防止在不应该
* 添加线程时添加了线程产生的错误警告。
*
* 2:如果一个任务能成功添加到任务队列,在我们添加一个新的线程时仍然需要进行双重检查
* (因为自 上一次检查后,可能线程池中的其它线程全部都被回收了) 或者在进入此方法后,
* 线程池已经 shutdown了。所以我们必须重新检查状态,如果有必要,就在线程池shutdown时采取
* 回滚入队操作移除任务,如果线程池的工作线程数为0,就启动新的线程。
*
* 3:如果任务不能入队,那么需要尝试添加一个新的线程,但如果这个操作失败了,那么我们知道线程
* 池可能已经shutdown了或者已经饱和了,从而拒绝任务.
intc = ctl.get();if(workerCountOf(c) < corePoolSize) {//如果工作线程数<核心线程数
if(addWorker(command,true))//添加一个工作线程来运行任务,如果成功了,则直接返回//if 还有要有大括号比较好,至少让阅读的人看起来更清楚,这里我要小小地批评一下小Leareturn;//如果添加线程失败了,就再次获取线程池控制状态
c = ctl.get(); }
然后判断线程状态是否在运行状态,如果在在运行状态 直接添加到任务队列中。
int recheck = ctl.get();
再次获取线程池状态
if (! isRunning(recheck) && remove(command)) reject(command);
如果线程状态已经不是运行状态了 在对列中移除任务 执行拒绝策略
else if (workerCountOf(recheck) == 0) addWorker(null, false);
检查工作线程数 如果等于0 添加队列中
else if (!addWorker(command, false)) reject(command);
再次尝试获取添加工作线程中 如果失败直接执行拒绝策略
addWorker() 分析:
int c = ctl.get();
int rs = runStateOf(c);
获取线程控制状态
获取线程池运行状态
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
判断 如果运行状态是否在运行状态 和 任务是否为空 缓存队列为空直接返回 false
int wc = workerCountOf(c);
获取工作线程数
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
根据线程数 比较否大于 核心线程数 和最大线程池数
if (compareAndIncrementWorkerCount(c)) break retry;
线程控制状态 原子操作 +1 却退出循环
c = ctl.get(); if (runStateOf(c) != rs) continue retry;
再次获取线程控制状态 如果线程的运行状态不相等 直接退出循环
Worker w = null;try { w = new Worker(firstTask);final Thread t = w.thread;
构建worker对象,获取worker对应得线程
if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();
如果线程不是空,获取线程池得锁
int rs = runStateOf(ctl.get());//拿着锁重新检查池程池的状
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果线程已经运行了或者还没有死掉,抛出一个IllegalThreadStateException异常
if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException();//把worker加入到工作线程Set里面
workers.add(w); int s = workers.size(); if (s > largestPoolSize)//如果工作线程池的大小大于
largestPoolSize largestPoolSize = s;//让largestPoolSize记录工作线程池的最大的大小
workerAdded = true;//工作线程被添加的标记置为true
}
} finally {
mainLock.unlock();//释放锁 }
if (workerAdded) {//如果工作线程已经被添加到工作线程池了
t.start();//开始执行任务
workerStarted = true;//把工作线程开始的标记置为true
}}
inally { if (! workerStarted)//如果没有添加,那么移除任务,并减少工作线程的数量(-1)
addWorkerFailed(w); }
return workerStarted;
真正执行任务的runWorker()方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//获取当前线程(和worker绑定的线程)
Runnable task = w.firstTask;//用task保存在worker中的任务
w.firstTask = null;//把worker中的任务置为null
w.unlock(); //释放锁
boolean completedAbruptly = true; try { //这个while循环,保证了如果任务队列中还有任务就继续拿出来执行,注意这里的短路情况
while (task != null || (task = getTask()) != null) {//如果任务不为空,或者任务队列中还有任务 w.lock();//获取锁 // If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//如果线程池的状态>STOP,直接中断 (Thread.interrupted() &&//调用者线程被中断
runStateAtLeast(ctl.get(), STOP))) &&//再次检查线程池的状态如果>STOP
!wt.isInterrupted())//当前线程还没有被中断 wt.interrupt();//中断当前线程
try { beforeExecute(wt, task);//在任务执行前的钩子方法
Throwable thrown = null;//用于记录运行任务时可能出现的异常 try { //开始正式运行任务
task.run(); }
catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); }
finally { //任务执行完成后的钩子方法 afterExecute(task, thrown); } }
finally { task = null;//把任务置为null w.completedTasks++;//把任务完成的数量+1
w.unlock();//释放锁 } }
completedAbruptly = false; }
finally { //当所有任务完成之后的一个钩子方法
processWorkerExit(w, completedAbruptly);
} }