ExecutorService的submit和execute
ExecuteService代表的是Executors创建的线程池
submit提交的是Callable方法,返回Future,说明submit是有返回值的
execute执行的是Runnable方法,没有返回值
所以submit和execute的区别是提交的方法和是否有返回值
ExecutorService的shutdown,shutdownNow,awaitTermination
flume中的关闭源码
public void stop() {
LOGGER.info("Configuration provider stopping");
executorService.shutdown();
try {
if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
executorService.shutdownNow();
while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for file watcher to terminate");
}
}
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for file watcher to terminate");
Thread.currentThread().interrupt();
}
lifecycleState = LifecycleState.STOP;
LOGGER.debug("Configuration provider stopped");
}
shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
awaitTermination方法:接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。
- 场景
应用场景为线程池的有效执行时间为20S,20S之后不管子任务有没有执行完毕,都要关闭线程池。代码如下
ExecutorService es = Executors.newFixedThreadPool(10);
es.execute(new Thread());//执行子线程任务
try {
es.shutdown();
if(!es.awaitTermination(20,TimeUnit.SECONDS)){//20S
System.out.println(" 到达指定时间,还有线程没执行完,不再等待,关闭线程池!");
es.shutdownNow();
}
} catch (Throwable e) { // TODO Auto-generated catch block
es.shutdownNow();
e.printStackTrace();
}
awaitTermination方法调用会被阻塞,直到所有任务执行完毕并且shutdown请求被调用,或者参数中定义的timeout时间到达或者当前线程被打断,这几种情况任意一个发生了就会导致该方法的执行。
当我们调用pool.awaitTermination时,首先该方法会被阻塞,这时会执行子线程中的任务,子线程执行完毕后该方法仍然会被阻塞,因为shutdown()方法还未被调用,而代码中将shutdown的请求放在了awaitTermination之后,这样就导致了只有awaitTermination方法执行完毕后才会执行shutdown请求,这样就造成了死锁。
shutdown的请求一定要放在awaitTermination之前
ExecuteService执行任务的异常处理
https://www.cnblogs.com/langtianya/p/4520373.html
下面这段代码执行的结果是什么?
executorService.submit(() -> {
System.out.println(1 / 0);
});
我被它坑过无数回了:它什么也不会输出。没有任何的java.lang.ArithmeticException: / by zero的征兆,啥也没有。线程池会把这个异常吞掉,就像什么也没发生过一样。如果是你自己创建的java.lang.Thread还好,这样 UncaughtExceptionHandler 还能起作用。不过如果是线程池的话你就得小心了。如果你提交的是Runnable对象的话(就像上面那个一样,没有返回值),你得将整个方法体用try- catch包起来,至少打印一下异常。如果你提交的是Callable<Integer>的话,得确保你在用get()方法取值的时候重新抛 出异常:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
监控队列长度,确保队列有界
不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请 求:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面的代码等价于Executors.newFixedThreadPool(n),然而不同的是默认的实现是一个无界的 LinkedBlockingQueue。这里我们用的是一个固定100大小的ArrayBlockingQueue。也就是说如果已经有100个任务在 队列中了(还有N个在执行中),新的任务就会被拒绝掉,并抛出RejectedExecutionException异常。由于这里的队列是在外部声明 的,我们还可以时不时地调用下它的size()方法来将队列大小记录在到日志/JMX/或者你所使用的监控系统中。
Executors.newCacheThreadPool线程和new ThreadPoolExecutor的区别和用法
Executors.去创建线程
五种线程池的适应场景
- newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
- newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
- newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行
为什么Executors创建线程不安全
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。
而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
上面提到的问题主要体现在newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并不是说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式创建的最大线程数可能是Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM。
自定义创建线程池
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
使用guava的ThreadFactoryBuilder
public class ExecutorsDemo {
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
pool.execute(new SubThread());
}
}
}
positionWriter = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
other
。。。