我比较常用的是定时任务线程池:
<pre>
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(XXX, XXX, XXX, XXX)
</pre>
ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor以及实现了ScheduledExecutorService接口的schedule类接口,由此我觉得我有必要深究ThreadPoolExecutor。
<h4>1、ThreadPoolExecutor的构造参数:</h4>
<pre>
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
</pre>
这7个参数如下:
1、corePoolSize核心线程数大小,当线程数<corePoolSize ,会创建线程执行runnable </br>
2、workQueue 保存任务的阻塞队列,有Array类型、Linked类型的队列 FIFO,如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3、maximumPoolSize 最大线程数, 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新线程来处理被添加的任务。
4、keepAliveTime 保持存活时间,当线程数大于corePoolSize,空闲线程能保持的最大时间。
如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
5、unit 时间单位,枚举类型有天、时分秒等
6、threadFactory 创建线程的工厂,默认是DefaultThreadFactory其创建的线程非守护、无优先级的,当然自己也可以实现ThreadFactory,如:
<pre>
private static class TestThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, THREAD_NAME); //自定义线程名称
t.setDaemon(true); //守护线程
return t;
}
}
</pre>
7、handler 拒绝策略:如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的遗弃策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
<pre>
</pre>
AbortPolicy:抛RejectedExecutionException异常
DiscardPolicy:抛弃不做任何处理,其实就是rejectedExecution方法里面空实现
DiscardOldestPolicy:将位于workQueue 工作队列头部的任务将被删除。但这样有个不好的地方,当某个任务还没执行完成时有可能就被取消掉了,这样完全无法预期线程的执行到了何种地步因为随时会被结束
CallerRunsPolicy:翻看其rejectedExecution接口实现 if (!e.isShutdown()) { r.run(); } 其实就是类似于在<b>client的主线程中执行了一下线程对象的run方法</b>,注意不是执行start方法来调度新的线程,举个例子
<pre>
public class Test {
public static class TestHH implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.CallerRunsPolicy());
for(int i = 0;i < 10;i++){
Runnable testRun = new TestHH();
tpe.execute(testRun);
}
}
}
输出:
main
main
main
main
main
main
main
main
pool-1-thread-1
pool-1-thread-1
</pre>
这边会发现是main这个client线程去执行run方法,这样其实也有个不好的地方,client线程会阻塞起来去执行被抛弃的线程的run方法。当然自己也可以自定义实现拒绝接口,如打印一些日志信息等
<b>最后还是上个图描述ThreadPoolExecutor构造流程更清晰一些:</b>
<pre>
</pre>
<h4>2、源代码分析:</h4>
<b>线程池状态分析</b>
RUNNING:接受新的任务和队列任务
SHUTDOWN:不接受新的任务但是正常处理队列任务
STOP:不接受新的任务不处理队列任务且中断正在处理的任务
TIDYING:所有任务被中止,正在工作的线程数为0
TERMINATED:线程池被彻底终止
线程池状态变化过程:
<pre>
//ctl一个变量同时存储runState和workerCount,其中runState占用高3位,workCount占用低29位,其实ReentrantLock的读写锁也是用16位的高位和16位的低位来表示
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//workerCount使用的位数:32-3=29位
private static final int COUNT_BITS = Integer.SIZE - 3;
//workerCount最大值:536870911,即0b00011111_11111111_11111111_11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState存储在高位,占用3位
//0b11100000_00000000_00000000_00000000
private static final int RUNNING = -1 << COUNT_BITS;
//0b00000000_00000000_00000000_00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//0b00100000_00000000_00000000_00000000
private static final int STOP = 1 << COUNT_BITS;
//0b01000000_00000000_00000000_00000000
private static final int TIDYING = 2 << COUNT_BITS;
//0b01100000_00000000_00000000_00000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取runState,即保�留ctl的高3位,后29位置0
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//获取workerCount,即保留ctl的低29位,高3位置0
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//设置ctl,或操作
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
</pre>
<b>execute的实现:</b>
<pre>
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //工作线程数小于核心线程数时
if (addWorker(command, true)) //将线程放进线程池中,如果线程池是SHUTDOWN或者STOP或者异常则返回false
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//double check,如果队列中已满则删除该线程任务且按reject策略处理
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //如果队列已满直接尝试创建线程处理,如果工作线程数未超过最大线程数则成功,否则按reject策略处理
reject(command);
}
</pre>
<b>addWorker实现:</b>
<pre>
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //java中标记的跳转来实现goto功能
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //线程状态中只有running的状态值小于SHUTDOWN
! (rs == SHUTDOWN &&
firstTask == null && //线程为空值
! workQueue.isEmpty())) //队列是空的
return false; //直接返回false
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; // 工作线程数大于上限则失败
if (compareAndIncrementWorkerCount(c)) //原子更新ctl值其实状态没变,工作线程数加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock(); //通过ReentrantLock来实现同步
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get(); //recheck
int rs = runStateOf(c); //recheck
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // running状态或者(SHUTDOWN且firstTask为空)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //添加保存工作线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //更新largestPoolSize值,getLargestPoolSize会引用
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true; //正常启动
}
}
} finally {
if (! workerStarted) //没有正常启动的话
addWorkerFailed(w); //里面workers会remove(w)且原子性修改ctl - 1的工作线程数量
}
return workerStarted;
}
</pre>
<b>shutdown and shutdownNow:</b>
shutDown()
当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时则不能再往线程池中添加任何任务,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
shutdownNow()
根据JDK文档描述,执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,同时它会返回那些未执行(还在队列中等待执行)的任务。 试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有使用sleep 、wait、Condition、定时锁等, interrupt()方法是无法中断当前的线程的。
所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
两者区别:
a、是一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态
b、并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程
c、还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行
d、相同的是都在最后尝试将线程池推到TERMINATED状态。
<pre>
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //check 是否有workers中的线程shutdown权限
advanceRunState(SHUTDOWN); //将线程池状态变成SHUTDOWN,同时不受理新的工作线程
interruptIdleWorkers();//中断状态不为isInterrupted()且空闲的线程
onShutdown(); // hook for ScheduledThreadPoolExecutor,只有ScheduledThreadPoolExecutor的实现中cancel delayed tasks
} finally {
mainLock.unlock();
}
tryTerminate();
}
</pre>
<pre>
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); //中断所有正在工作的线程
tasks = drainQueue(); //取出所有workQueue中等待的任务
} finally {
mainLock.unlock();
}
tryTerminate(); //最后检查线程池工作线程数是否为0,并尝试切换到TERMINATED状态。
return tasks;
}
drainQueue()中drainTo方法我从没用过:
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
翻了一下源代码:其意思为加了个ReentrantLock锁线程安全的把workQueue中的线程copy到taskList
</pre>
<h4>3、预留扩展</h4>
<pre>
预留扩展:
beforeExecute() 在每个任务执行前做的处理
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute() 在每个任务执行后做的处理
protected void afterExecute(Runnable r, Throwable t) { }
terminated() 在ThreadPoolExecutor到达TERMINATED状态前所做的处理
protected void terminated() { }
finalize() 有默认实现,直接调用shutdown(),以保证线程池对象回收
protected void finalize() {
shutdown();
}
onShutdown() 在shutdown()方法执行到最后时调用,在java.util.concurrent.ScheduledThreadPoolExecutor类实现中用到了这个扩展点,做一些任务队列的清理操作。
void onShutdown() {
}
</pre>