1.为什么要使用线程池
直接创建线程,线程本身需要占用内存空间,大量的线程会抢占宝贵的内存资源,如果处理不当,会导致内存溢出,而且,线程回收会给GC带来很大的压力,延迟GC的停顿时间。
2.常用线程池
<1>newFixedThreadPool
返回一个固定线程数量的线程池
<2>newSingleThreadExecutor()
返回一个只有一个线程的线程池
<3>newCachedThreadPool()
返回一个根据实际情况调整数量的线程池,线程池数量不确定,有空闲线程可以复用则复用,没有就一直会创建新的
<4>newSingleThreadScheduledExecutor()
返回一个计划任务线程池,但是线程数量为1
<5>newScheduledThreadPool()
返回一个计划任务线程池
2.1计划任务
计划任务不一定会立即安排执行任务,会在指定时间内进行调度。
3个重要的方法:
<1>schedule
会在给定的时间内,对任务进行一次调度
<2>scheduleAtFirstRate
会对任务进行周期性调度,周期为上一个任务开始执行的时间+周期时间=下一个任务的开始时间
<3>scheduleWithFixedDelay()
会对任务进行周期性调度,周期为上一个任务结束时间+周期时间=下一个任务的开始时间
任务堆叠处理:
ScheduleExecutorService不会让任务出现堆叠的情况,比如任务周期为2s,但是任务执行花了8秒,上一个任务没有执行完成,下一个任务不会开始执行,造成任务堆叠;但是这里上一任务执行完成后,下一个任务会马上执行。
注意:调度程度并不保证任务会无限期的持续调用,如果任务本身出现异常,那么后续的所有执行都会被中断,因此需要对异常处理。
3.核心线程池的内部实现
线程池核心类
ThreadPoolExecutor(int corePoolSize,
int maxinumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
RejectedExecutionHandler handler);
核心参数:
workQueue:
<1>直接提交的队列:SynchronousQueue
它没有容量,每一个插入操作都要等待一个相应的删除操作,每一个删除操作都要等待对应的插入操作。
如果没有空闲线程,则尝试创建新的线程,如果线程数量达到最大值,则使用拒绝策略;
<2>有界的任务队列:ArrayBlockingQueue
如果线程数量小于corePoolSize时,会优先创建新线程,若大于corePoolSize,则会将新任务加入到等待队列;等待队列已满,则在总线程数不大于maxinumPoolSize的前提下,创建新的线程执行任务,若大于,则执行拒绝策略;
<3>无界任务队列 LinkedBlockQueue
除非系统资源耗尽,否则无界的任务队列不存在任务入队列失败的情况;新任务到来时,如果线程数小于corePoolSize, 则创建新的线程执行任务,但当线程池中线程达到poolSize后,就不会增加;
<4>优先任务队列: PriorityBlockingQueue
可以控制任务执行的先后顺序,它是一个特殊的无界队列
拒绝策略
AbortPolicy策略:直接抛出异常,阻止系统正常运行;
CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,显然这样做不会真的丢弃任务,但是,任务提交线程池的性能极有可能会急剧下降;
DiscardOledestPolicy策略:丢弃最老的一个请求,也就是即将被执行的下一个,并尝试再次提交当前任务;
DiscardPolicy策略:丢弃无法处理的任务,不处理;
自定义拒绝策略:实现RejectExecutionHandler接口
自定义线程池
ThreadFactory是一个接口,只有一个方法
Thread newThread(Runnable r);
4.扩展线程池
ThreadPoolExecutor提供beforeExecutor、afterExecutor和terminated() 三个方法对线程池进行控制。
ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,实现了Runnable接口, 执行Worker.runTask()方法内部,执行了
beforeExecutor和afterExecute方法
terminated记录了整个线程池的退出
5.优化线程数量
经验公式
Ncpu -> CPU数量
Ucpu -> 目标cpu的使用率
W/C -> 等待时间与计算时间的比率
最优线程池大小=Ncpu * Ucpu * (1 + W/C)
6.在线程池中寻找堆栈
自定义ThreadPoolExecutor 包装submit方法
public static class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace()));
}
private Throwable clientTrace() {
return new Throwable("trace");
}
private Runnable wrap(final Runnable task, final Throwable clientTrace) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
throw e;
}
}
};
}
}
public static void main(String[] args) {
TraceThreadPoolExecutor executor = new TraceThreadPoolExecutor(2,4, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 5; i ++) {
executor.submit(new MyTask(i));
}
executor.shutdown();
}
public static class MyTask implements Runnable {
private int size;
public MyTask(int size) {
this.size = size;
}
@Override
public void run() {
int result = 100/size;
System.out.println(result);
}
}
7.fork/join框架
含义:
fork() 创建子进程,使得系统进程可以多一个执行分支;
join() 等待,等待这个执行分支执行完毕,才能得到结果
ForkJoinPool中如果A线程执行完毕,线程A会帮助线程B,从线程B的任务队列中拿一个任务过来处理,尽可能达到平衡;
注意:线程执行自己的任务从任务队列顶部拿任务,而帮助别人执行时,从队列底部拿任务,可以有效避免数据竞争;
ForkJoinPoll的重要方法:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);
向ForkJoinPool线程池提交一个ForkJoinTask任务,所谓的ForkJoinTask就是支持fork分解和join等待的任务。
ForkJoinTask有2个重要的子类:RecursiveAction和RecursiveTask. 他们分别代表没有返回值的任务和可以带返回值的任务。