说到多线程,概念性东西就不一一赘述了,首先回顾下线程的创建。
Java线程创建的四种方式
1.继承Thread类,重写run方法
static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}
public static void main(String[] args) {
ThreadDemo thread = new ThreadDemo();
thread.setDaemon(true);
thread.setName("thread_demo");
thread.start();
}
2.实现Runnable接口,重写run方法,实现Runnable接口的实现类的实例对象作为Thread构造函数的target
static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}
public static void main(String[] args) {
Thread thread = new Thread(new RunnableDemo());
thread.start();
}
3.通过Callable和FutureTask创建线程
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callable = new CallableDemo();
FutureTask<Object> futureTask = new FutureTask<>(callable);
new Thread(futureTask)..start();
Object o = futureTask.get();
}
static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}
可以看出Callable与Runable的区别在于Callable带有返回值且可以检测线程是否完成
4.通过线程池创建线程
static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}
static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}
static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new ThreadDemo());
executorService.execute(new RunnableDemo());
FutureTask<Object> futureTask = new FutureTask<>(new CallableDemo());
Future<?> submit = executorService.submit(futureTask);
submit.get();
}
说到线程池,Executor提供了四种线程池
1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
但安装编码规约插件的同学会发现用Executor创建线程池会爆红提示,当然也给出了解释:
找到源码点进去一探究竟
newFixedThreadPool除了设置了核心线程数和最大线程数,其他用的都是默认值。
那来了解下ThreadPoolExecutor的核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize:核心线程数
核心线程会一直存活,及时没有任务需要执行
当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭queueCapacity:任务队列容量(阻塞队列)
当核心线程数达到最大时,新任务会放在队列中排队等待执行maxPoolSize:最大线程数
当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常keepAliveTime:线程空闲时间
当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
如果allowCoreThreadTimeout=true,则会直到线程数量=0allowCoreThreadTimeout:允许核心线程超时
rejectedExecutionHandler:任务拒绝处理器
当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
实现RejectedExecutionHandler接口,可自定义处理器
参数设置不当是会出现oom的哦,所以要注意核心参数的默认值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
参数设置了,在饱和的情况下ThreadPoolExecutor的处理顺序是什么样子的呢?
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满
- 若线程数小于最大线程数,创建线程
- 若线程数等于最大线程数,抛出异常,拒绝任务
最后分享个自己在项目中常用的线程池创建工具类
@Slf4j
public class LocalThreadPool {
public final static String poolName = "thread_pool";
private volatile static LocalThreadPool singletonPool;
private ThreadPoolExecutor executor;
private ThreadPoolExecutor callable;
public static LocalThreadPool getInstance(){
if(singletonPool == null){
synchronized (LocalThreadPool.class){
if(singletonPool == null){
singletonPool = new LocalThreadPool();
}
}
}
return singletonPool;
}
private LocalThreadPool(){
//runnable
final AtomicInteger runnableId = new AtomicInteger(0);
ThreadFactory runableFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"thread_pool_executor_"+runnableId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};
//callable
final AtomicInteger callableId = new AtomicInteger(0);
ThreadFactory callableFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"thread_pool_callable"+callableId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};
executor = new ThreadPoolExecutor(10,20,60,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20),runableFactory,new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run runnable");
}
}
});
callable = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20),
callableFactory, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run callable");
}
}
});
}
public void execute(Runnable r){
executor.execute(r);
}
public <T> Future<T> submit(Callable<T> c){
return callable.submit(c);
}
}
用起来非常之方便
public static void main(String[] args) {
Future<Object> submit = LocalThreadPool.getInstance().submit(new Callable<Object>() {
@Override
public Object call() {
return null;
}
});
LocalThreadPool.getInstance().execute(new Runnable() {
@Override
public void run() {
//业务代码......
}
});
}