为什么使用线程池
在没有线程池的场景,我们使用多线程时:
- 线程池的创建和销毁需要消耗额外的资源。
线程的创建需要开辟虚拟机栈,本地方法栈、程序计数器等线程私有的内存空间。
在线程的销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险 - 系统超过负载后没有拒绝策略,进一步加重系统负载,可能引起系统运行卡顿甚至崩溃
引入线程池的好处:
- 线程资源复用;
- 提供线程管理功能,控制线程并发数,合理使用系统资源
运行线程数量达到核心线程数量后,会进入缓存队列;
队列满后会启动额外线程(数量不大于最大线程数)加快执行;
线程数量达到最大和队列已满的情况,会执行拒绝策略。 - 线程环境隔离。不同环境和优先级,可以分别使用线程池来隔离线程环境,保证优先级高的服务正常运行。
使用线程池的风险
用线程池有同步错误和死锁、资源不足和线程泄漏等风险。
- 死锁
任何多线程应用程序都有死锁风险。
虽然任何多线程程序中都有死锁的风险,但线程池却引入了另一种死锁可能,在那种情况下,所有池线程都在执行已阻塞的等待队列中另一任务的执行结果的任务,但这一任务却因为没有未被占用的线程而不能运行。当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任务执行,查询对象又同步等待着响应时,会发生这种情况。 - 资源不足
线程池在恰当地调整了线程池大小时,通常执行得很好。线程消耗包括内存和其它系统资源在内的大量资源。
如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例如 JDBC 连接、套接字或文件。这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。 - 线程泄漏
各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。
有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久停止,而这些停止的任务也会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们自己的线程,要么只让它们等待有限的时间。 - 请求过载
请求过多可能压垮服务器。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可以用一个指出服务器暂时很忙的响应来拒绝请求
有效使用线程池的准则
- 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行。
- 在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得某些进展。
- 理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队列可能会有意义,这样可以相应地调整每个池。
线程池的大小设置
调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。在运行于具有 N 个处理器机器上的计算限制的应用程序中,在线程数目接近 N 时添加额外的线程可能会改善总处理能力,而在线程数目超过 N 时添加额外的线程将不起作用。事实上,太多的线程甚至会降低性能,因为它会导致额外的环境切换开销。
- 线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。
- 对于那些可能需要等待 I/O 完成的任务(例如,从套接字读取 HTTP 请求的任务),需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,您可以估计某个典型请求的等待时间(WT)与服务时间(ST)之间的比例。如果我们将这一比例称之为 WT/ST,那么对于一个具有 N 个处理器的系统,需要设置大约 N*(1+WT/ST) 个线程来保持处理器得到充分利用。
处理器利用率不是调整线程池大小过程中的唯一考虑事项。随着线程池的增长,您可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。
常用的几种线程池
- newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
特点:
工作线程的创建数量几乎没有限制(数目限制为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
- 执行任务时有空闲线程,直接使用,没有则创建
- 任务执行完成后,等待1分钟后关闭回收
- newFixedThreadPool
创建一个指定工作线程数量的线程池,无限制(Integer.MAX_VALUE)长度任务队列链表,队列满后后进任务直接丢弃。
每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
- 多线程时,线程最大保持在设定线程量
- 新任务到来时,如果线程数量未达到线程池初始的最大数,则会创建一个线程
- 任务执行完毕后,线程继续保持,不会关闭
newSingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。newScheduleThreadPool
创建一个定长的线程池,无限制长度任务队列链表。支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
- 直接执行:
public void testExecute() throws InterruptedException {
scheduledThreadPoolExecutor.execute(new TestTask(scheduledThreadPoolExecutor, 1000));
}
- 延时执行
//延迟5秒后执行
public void delayExecuteTask() throws InterruptedException {
System.out.println("task start");
int delaySecond = 5;
scheduledThreadPoolExecutor.schedule(new TestTask(scheduledThreadPoolExecutor, 0), delaySecond, TimeUnit.SECONDS);
for (int i = delaySecond; i > 0; i--) {
System.out.println("last time:" + i);
Thread.sleep(1000);
}
}
- 循环周期执行
//延时5秒后,每3秒执行一次
public void scheduleWithFixedDelay() throws InterruptedException {
scheduledThreadPoolExecutor.scheduleWithFixedDelay(new TestTask(scheduledThreadPoolExecutor, 0), 5000, 3000, TimeUnit.MILLISECONDS);
for (int i = 0; i < 10000; i++) {
System.out.println("time:" + i);
Thread.sleep(1000);
}
}
线程池的拒绝策略
- CallerRunsPolicy
线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。 - AbortPolicy
处理程序遭到拒绝将抛出运行时 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录) - DiscardPolicy
不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} - DiscardOldestPolicy
如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
线程池的原理
线程池的核心类:ThreadPoolExecutor,他实现了Executor接口,如下图所示:
1、提交新的task任务时,线程池判断核心线程池是否已满,如果未满则创建新的worker线程。否则进入2。
2、判断任务队列是否已经满,如果还没满将任务放入队列。否则进入3。
3、判断线程池的线程是否有空闲,如果没有,则重新创建worker线程。如果线程池已满,则执行拒绝策略。
4、如果worker线程处于空闲状态,且线程数大于核心线程数,超时后则执行线程超时销毁
详见下图:
线程池部分核心代码
- 执行任务主代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//1、线程数小于核心线程数,添加worker任务并执行
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2、线程数>=核心线程数时,添加任务到队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3、添加队列失败后,如果线程未达到最多线程数,则新增worker任务
else if (!addWorker(command, false))
//新增失败,执行拒绝策略
reject(command);
}
- 添加worker任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断线程是否达到上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS添加线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新增worker任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 线程复用的代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task==null(非首次任务代码),则尝试从阻塞队列拿任务
//没有拿到任务,则销毁线程
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 从队列获取任务代码块
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超时、线程数大于核心线程数,且队列为空时,销毁线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务,超过keepAliveTime未获取到,则继续往下执行
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}