构建服务器应用程序的有效方法 — 线程池
为什么使用线程池?
每次通过 new Thread 创建线程并不是一种好的方式,每次 new Thread 新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争、占用过多资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。
Java 提供了 4 种线程池,能够有效地管理、调度线程,避免过多的资源损耗。它的优点如下:
1. 重用存在的线程,减少对象创建、销毁的开销;
2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞;
3. 提供定时执行、定期执行、单线程、并发数控制等功能。
线程池原理简单地解释就是:会创建多个线程并且进行管理,提交给线程的任务会被线程池指派给其中的线程进行执行,通过线程池的统一调度、管理使得多线程的使用更简单、高效。
线程池都实现了 ExecutorService 接口,该接口定义了线程池需要实现的接口,如 submit、execute、shutdown 等。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
两种常用的线程池实现:
ThreadPoolExecutor,也就是我们用最多的线程池实现;
ScheduledThreadPoolExecutor,则用于周期性地执行任务。
我们一般都不会通过 new 的形式来创建线程池,因为创建参数过程相对复杂,所以,JDK 提供了一个 Executors 工厂类来简化这个过程。下面分别介绍 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的使用。
1. 启动指定数量的线程 — ThreadPoolExecutor
ThreadPoolExecutor 是线程池的实现之一,它的功能是启动指定数量的线程以及将任务添加到一个队列中,并且将任务分发给空闲的线程。
ExecutorService 的生命周期包括 3 种状态:运行、关闭、终止。创建后便进入运行状态,当调用 shutdown() 方法时便进入关闭状态,此时 ExecutorService 不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。
ThreadPoolExecutor 构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数名 | 作用 |
---|---|
corePoolSize | 线程池中所保存的核心线程数。 |
maximumPoolSize | 线程池允许创建的最大线程数。 |
keepAliveTime | 当前线程池线程总数大于核心线程数时,终止多余的空闲线程的时间。 |
unit | keepAliveTime 参数的时间单位,可选值有毫秒、秒、分等。 |
workQueue | 任务队列,如果当前线程池达到核心线程数 corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中。 |
threadFactory | 线程工厂,让用户可以定制线程的创建过程,通常不需设置。 |
Handler | 拒绝策略,当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略。 |
线程池启动后默认是空的,只有任务来临才会创建线程以处理请求。
prestartAllCoreThreads()
方法可以让线程池启动后立即启动所有核心线程以等待任务。任务数量小于 corePoolSize,则立即创建新线程来处理任务;
任务数量大于 corePoolSize,但小于 maximumPoolSize,则将任务放进 workQueue,当阻塞队列满时才创建新线程;
如果 corePoolSize 与 maximumPoolSize 相同,则创建了固定大小的线程池。
workQueue 有下列几个常用实现:
ArrayBlockingQueue:基于数组结构的有界队列,此队列按 FIFO 原则对任务进行排序。如果队列满了还有任务进来,则调用拒绝策略。
LinkedBlockingQueue:基于链表结构的无界队列,此队列按 FIFO 原则对任务进行排序。因为它是无界的,所以不会满,采用此队列后线程池将忽略拒绝策略(handler)参数,同时忽略最大线程数 maximumPoolSize 等参数。
SynchronousQueue:直接将任务提交给线程而不是将它加入到队列,实际上此队列是空的。如果新任务来了线程池没有任何可用线程处理的话,则调用拒绝策略。其实要是把 maximumPoolSize 设置成无界(integer.MAX_VALUE),加上 SynchronousQueue 队列,就等同于 Executors.newCachedThreadPool()。
PriorityBlockingQueue:具有优先级的队列的有界队列,可以自定义优先级,默认是按自然排序,可能很多场合并不适合。
当线程池与 workQueue 队列都满了的情况下,对新加任务采取的处理策略有如下四个默认实现:
AbortPolicy:拒绝任务,抛出 RejectedExecutionException 异常。线程池的默认策略。
CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
DiscardOldestPolicy:如果线程池尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而有可能被执行,先前加入的都被抛弃了。
DiscardPolicy:加不进的任务都被抛弃了,同时没有异常抛出。
1.1 newFixedThreadPool(int size)
对与 Android 平台来说,由于资源有限,最常使用的就是通过 Executors.newFixedThreadPool(int size) 函数来启动固定数量的线程池:
public class ExecutorDemo {
// 任务数量
private static final int MAX = 10;
public static void fixedThreadPool(int size) throws ExecutionException, InterruptedException {
// 创建固定数量的线程池
ExecutorService executorService = Executors.newFixedThreadPool(size);
for (int i = 1; i <= MAX; i++) {
// 提交任务
Future<Integer> task = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("执行线程:" + Thread.currentThread().getName());
return fibc(40);
}
});
System.out.println("第 " + i + " 次计算,结果:" + task.get());
}
}
private static int fibc(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fibc(n - 1) + fibc(n - 2);
}
}
newFixedThreadPool(int nThreads) 的实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
在该函数中,会调用 ThreadPoolExecutor 的构造函数,设置它的 corePoolSize 和 maximumPoolSize 值都是 nThreads,并且设置 keepAliveTime 参数为 0 毫秒,最后设置无界任务队列。这样该线程池就含有固定个数的线程,并且能容纳无限个任务。
输出结果如下:
执行线程:pool-3-thread-1
第 1 次计算,结果:102334155
执行线程:pool-3-thread-2
第 2 次计算,结果:102334155
执行线程:pool-3-thread-3
第 3 次计算,结果:102334155
执行线程:pool-3-thread-1
第 4 次计算,结果:102334155
执行线程:pool-3-thread-2
第 5 次计算,结果:102334155
执行线程:pool-3-thread-3
第 6 次计算,结果:102334155
执行线程:pool-3-thread-1
第 7 次计算,结果:102334155
执行线程:pool-3-thread-2
第 8 次计算,结果:102334155
执行线程:pool-3-thread-3
第 9 次计算,结果:102334155
执行线程:pool-3-thread-1
第 10 次计算,结果:102334155
1.2 newCachedThreadPool()
线程越多,并发量越大,然而占用的内存也就越大,指定过大的线程数量并不可取。因此,我们可能需要一种场景,如果来了一个新的任务,并且没有空闲线程可用,此时必须马上创建一个线程来立即执行任务。这时就可以通过 Executors 的 newCachedThreadPool 函数来实现。
// 创建线程池
public static void cachedThreadPool() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i <= MAX; i++) {
// 提交任务
Future<Integer> task = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("执行线程:" + Thread.currentThread().getName());
return fibc(40);
}
});
System.out.println("第 " + i + " 次计算,结果:" + task.get());
}
}
newCachedThreadPool() 函数实现:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,newCachedThreadPool 函数不需传入线程的数量。它会调用 ThreadPoolExecutor 的构造函数,设置它的
maximumPoolSize 值为无界值(Integer.MAX_VALUE),并且设置 keepAliveTime 参数为 60 秒,最后设置 SynchronousQueue
任务队列。这样就可以适应任意数量的并发任务。线程池为每个任务都创建了 1 个线程,当然这是在没有线程空闲的情况下才会创建新的线程。若一个线程中的任务已经做完了,那么这个线程可以为未被执行的任务提供执行。
输出结果如下:
执行线程:pool-3-thread-1
第 1 次计算,结果:102334155
执行线程:pool-3-thread-2
第 2 次计算,结果:102334155
执行线程:pool-3-thread-2
第 3 次计算,结果:102334155
执行线程:pool-3-thread-2
第 4 次计算,结果:102334155
执行线程:pool-3-thread-2
第 5 次计算,结果:102334155
执行线程:pool-3-thread-2
第 6 次计算,结果:102334155
执行线程:pool-3-thread-2
第 7 次计算,结果:102334155
执行线程:pool-3-thread-2
第 8 次计算,结果:102334155
执行线程:pool-3-thread-2
第 9 次计算,结果:102334155
执行线程:pool-3-thread-2
第 10 次计算,结果:102334155
2. 定时执行一些任务 — ScheduledThreadPoolExecutor
当我们需要定时执行一些任务,可以通过 ScheduledThreadPoolExecutor 来实现。通过 Executors 的 newScheduledThreadPool 函数可以很方便地创建定时执行任务的线程池。
下面是一个例子:
public class ScheduledThreadPoolDemo {
public static void scheduledThreadPool() {
// 创建定时执行的线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
// 参数 2 为第一次执行任务延迟的时间,
// 意思就是第一次调度开始时间点=当前时间 + initialDelay
// 参数 3 为执行周期
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算1:");
System.out.println("结果:" + fibc(30));
}
}, 1, 2, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName() + ",定时计算2:");
System.out.println("结果:" + fibc(40));
}
}, 1, 2, TimeUnit.SECONDS);
}
private static int fibc(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fibc(n - 1) + fibc(n - 2);
}
}
public class Main {
public static void main(String[] args) {
// write your code here
ScheduledThreadPoolDemo.scheduledThreadPool();
}
该线程池有 3 个线程,我们指定了 2 个定时任务,因此,该线程池有两个线程来定时完成任务。
scheduleAtFixedRate() 函数的实现:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
scheduleAtFixedRate() 函数就是设置定时任务的方法,参数 1 是要执行的任务,参数 2 是第一次运行任务时延迟时间(第一次调度开始时间点 = 当前时间 + initialDelay ),参数 3 是定时任务的周期(两次任务调度的间隔时间),参数 4 是时间单元,这里设置为秒。
部分输出结果:
Thread: pool-2-thread-1,定时计算1:
Thread: pool-2-thread-2,定时计算2:
结果:832040
结果:102334155
Thread: pool-2-thread-1,定时计算1:
Thread: pool-2-thread-3,定时计算2:
结果:832040
结果:102334155