例子
- 比如需要导出不同地方的文件,可以用线程池开启多个线程同时执行
public void testThreadPool() throws InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(8, 8, 300, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "测试线程");
}
}, new ThreadPoolExecutor.AbortPolicy());
List<Future<String>> futureList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("线程被打断运行" + Thread.currentThread().getName());
}
System.out.println("线程运行" + Thread.currentThread().getName());
}
};
Future<String> future = pool.submit(run, new String());
futureList.add(future);
}
pool.shutdown();
try {
pool.awaitTermination(1, TimeUnit.MILLISECONDS)
} catch (InterruptedException e) {
e.printStackTrace();
}
//处理返回值
for(Future<String> future : futureList){
if(future.isDone()){
System.out.println("已完成");
}else {
System.out.println("未完成");
}
}
}
线程池核心参数含义以及四种常用线程池
核心参数含义
- corePoolSize 核心线程数 默认情况下会一直存活
- maximumPoolSize 线程池所能接受最大线程数
- keepAliveTime 非核心线程闲置时长,超过则会被回收
- unit TimeUnit 时间单位 秒 分 时
- workQueue 任务队列 BlockQueue,有ArrayBlockingQueue(一个基于数组结构的有界阻塞队列),LinkedBlockingQueue(一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列),PriorityBlockingQueue(一个具有优先级的无限阻塞队列),DelayQueue(定时任务用,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态), SynchronousQueue(可缓存化化线程池)
- threadFactory 可定义线程名等
- handler 拒绝策略默认为AbortPolicy抛异常,CallerRunsPolicy减缓提交速度,iscardPolicy 不能执行的任务将被删除,DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程
各个参数在源码中应用,整个流程也是线程池执行原理
- submit方法除了new FuntureTask之外还会直接调用executor方法,现在看下executor方法,这其中主要涉及corePoolSize, maximumPoolSize,拒绝策略, 队列,keepAliveTime
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 如果队列大小设置不合适, corePoolSize跟maximumPoolSize不一致这时候,当队列满时新加任务会比队列里面任务更早执行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 有四个步骤
* 1. 如果正在运行的线程数量小于 corePoolSize,那么马上创
* 建线程运行这个任务
* 2. 如果正在运行的线程数量大于或等于 corePoolSize,那么
* 将这个任务放入队列
* 3. 如果这时候队列满了,而且正在运行的线程数量小于
* maximumPoolSize,那么还是要创建线程运行这个任务
* 4. 如果队列满了,而且正在运行的线程数量大于或等于
* maximumPoolSize,这时候会调用拒绝策略,默认抛异常
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//此处会检查当前线程是否已经超过最大检查,是的话返回错误
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动Work线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//线程创建并运行之后不断的从队列里面获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 当当前线程超过核心线程或者允许摧毁核心线程 为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//当当前线程超过核心线程或者允许摧毁核心线程 从队列poll任务,
//当超时还没获取到返回null(有可能已经任务了),
//线程结束,调用结束线程方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
四种常用线程池
- 定长线程池(FixedThreadPool)比较常用
- 定时线程池(ScheduledThreadPool) 一般用定时任务
- 可缓存线程池 CachedThreadPool
- 单线程化线程池 SingleThreadExecutor
四种常用线程池比较
- 定长线程池 只有核心线程数,任务队列无大小限制,可控制最大并发数
- 定时线程池 周期性执行任务
- 可缓存线程池 只有非核心线程数,超时能回收非核心线程对应keepAlive,适合数量大但是执行周期短的任务
- 单线程化线程 只有一个核心线程,保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题
高并发知识体系
线程池原理解析涉及知识点
- BlockQueue 并发常用队列,线程安全
- AQS 抽象的队列式的同步器,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
- 如何设置线程池参数大小(corePoolSize 2n(n为cpu核数)), cpu之类的知识
- 在《linux多线程服务器端编程》中有一个思路,CPU计算和IO的阻抗匹配原则,如果线程池中的线程在执行任务时,密集计算所占的时间比重为P(0<P<=1),而系统一共有C个CPU,为了让CPU跑满而又不过载,线程池的大小经验公式 T = C / P。在此,T只是一个参考,考虑到P的估计并不是很准确,T的最佳估值可以上下浮动50%.
- 如果是CPU密集型的任务,我们应该设置数目较小的线程数,比如CPU数目加1。
- 线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
- 优雅的关闭线程(可配合使用,强制关闭也不合适)
- shutdownNow:线程池拒接收新提交的任务,同时立马中断线程池里面的线程,如果线程正在执行io 或者阻塞则会被关闭。runWork方法里面有work.run如果执行io操作则会被中断,还有getTask被标记STOP,所以不管队列是否为空都会返回null
- shutdown方法的解释是:线程池拒接收新提交的任务,同时中断空闲线程。有一个w.tryLock()加锁判断,只有加锁成功的线程才会被调用interrupt方法。所以只有空闲 线程才会被中断,只有队列为空,getTask方法才会返回null,导致线程退出。
- awaitTermination超时关闭线程池时返回true,可配合shutdownNow使用
- 参考自 如何优雅的关闭Java线程池
参考文献