java线程:线程池
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,
这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?数据库连接
有数据库连接池,而线程就有线程池了。
线程池的类结构:
可以先看看ExcutorService
,ExecutorService
的实现就是一个线程池的实现
1、创建可重用的固定线程的集合线程池,以共享的无界队列方式来运行这些线程:
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 创建可以容纳10个线程的线程池
2、 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们:
ExecutorService threadPool = Executors.newCachedThreadPool();
// 线程池的大小会根据执行的任务数动态分配
3、创建一个使用单个线程的 Executor,以无界队列方式来运行该线程。
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 创建单个线程的线程池,如果当前线程在执行任务时突然中断,则会创建一个新的线程替代它继续执行任务
4、 创建一个可安排在给定延迟后运行命令或者定期地执行的线程池。
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
// 效果类似于Timer定时器
1. FixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,
如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public static void main(String[] args) {
ExecutorService threadPool= Executors.newFixedThreadPool(3);
for (int i = 1; i <5 ; i++) {
final int id=i;
threadPool.execute(new Runnable() {
@Override
public void run() {
for (int j = 1; j <5 ; j++) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第" + id + "次任务的第" + j + "次执行");
}
}
});
}
threadPool.shutdown();
}
//输出
第1次任务的第1次执行
第3次任务的第1次执行
第2次任务的第1次执行
第2次任务的第2次执行
第3次任务的第2次执行
第1次任务的第2次执行
第1次任务的第3次执行
第3次任务的第3次执行
第2次任务的第3次执行
第1次任务的第4次执行
第2次任务的第4次执行
第3次任务的第4次执行
第4次任务的第1次执行
第4次任务的第2次执行
第4次任务的第3次执行
第4次任务的第4次执行
上段代码中,创建了一个固定大小的线程池,容量为3,然后循环执行了4个任务,
由输出结果可以看到,前3个任务首先执行完,然后空闲下来的线程去执行第4个任务,
在FixedThreadPool中,有一个固定大小的池,如果当前需要执行的任务超过了池大小,
那么多于的任务等待状态,
直到有空闲下来的线程执行任务,而当执行的任务小于池大小,空闲的线程也不会去销毁。
2.CachedThreadPool
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
此线程池不会对线程池大小做限制,
线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
将上段代码newFixedThreadPool(3)
换成newCachedThreadPool()
;
public static void main(String[] args) {
ExecutorService threadPool= Executors.newCachedThreadPool();
for (int i = 1; i <5 ; i++) {
final int id=i;
threadPool.execute(new Runnable() {
@Override
public void run() {
for (int j = 1; j <5 ; j++) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第" + id + "次任务的第" + j + "次执行");
}
}
});
}
threadPool.shutdown();
}
//输出
第2次任务的第1次执行
第3次任务的第1次执行
第1次任务的第1次执行
第4次任务的第1次执行
第3次任务的第2次执行
第2次任务的第2次执行
第1次任务的第2次执行
第4次任务的第2次执行
第2次任务的第3次执行
第3次任务的第3次执行
第1次任务的第3次执行
第4次任务的第3次执行
第2次任务的第4次执行
第1次任务的第4次执行
第4次任务的第4次执行
第3次任务的第4次执行
这四个任务是交替执行的,CachedThreadPool会创建一个缓存区,将初始化线程存起来
如果有线程可用,就用之前创建好的线程,如果没有可用的,就创建新线程,终止并且从
缓存中移除已有60秒未被使用的线程
3. SingleThreadExecutor
上段代码其它地方不变,将newFixedThreadPool
方法换成newSingleThreadExecutor
方法。
//输出
第1次任务的第1次执行
第1次任务的第2次执行
第1次任务的第3次执行
第1次任务的第4次执行
第2次任务的第1次执行
第2次任务的第2次执行
第2次任务的第3次执行
第2次任务的第4次执行
第3次任务的第1次执行
第3次任务的第2次执行
第3次任务的第3次执行
第3次任务的第4次执行
第4次任务的第1次执行
第4次任务的第2次执行
第4次任务的第3次执行
第4次任务的第4次执行
可以看出newSingleThreadExecutor
是创建单个线程去执行,这个线程会保证你的任务执行完成
,如果当前线程意外终止的话,会创建一个新线程继续执行任务。
4.ScheduledThreadPool
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(1);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("5秒执行");
}
},5, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("2秒执行一次");
}
},5,2,TimeUnit.SECONDS);
}
//输出
5秒执行
2秒执行一次
2秒执行一次
2秒执行一次
2秒执行一次
2秒执行一次
......
schedule
:一次性执行任务,执行完成结束。
scheduleAtFixedRate
:这个是按照固定的时间来执行,简单来说:到点执行,开启一个新线程,
scheduleWithFixedDelay
:这个呢,
是等上一个任务结束后,在等固定的时间,然后执行。简单来说:执行完上一个任务后再执行
ThreadPoolExecutor
ThreadPoolExecutor
是ExecutorService
的一个实现类,
它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors
工厂方法配置。
构造方法
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
ThreadPoolExecutor参数:
参数名 | 作用 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 线程池最大大小 |
keepAliveTime | 线程池中超过corePoolSize 数目的空闲线程最大存活时间; |
TimeUnit | 保持活动时间单位 |
BlockingQueue | 工作队列 |
ThreadFactory | 线程工厂 |
RejectedExecutionHandler | 驳回回调,当提交任务数量超过maximumPoolSize+workQueue的大小之后,任务会交给RejectedExecutionHandler 完成 |
1.当任务数量<corePoolSize
的时候,即使corePoolSize
中存在空闲线程,新提交的任务也会创建
一个新的线程去执行任务。
2.当线程数达到corePoolSize
的大小,空闲时达到,新提交的任务会被放入BlockingQueue
的工作
队列中,等待线程池的任务调度执行。
3.当BlockingQueue
已经达到最大值,且maximumPoolSize>corePoolSize
时,新提交任务会创建新线程执行任务。
4.当提交的任务超过maximumPoolSize
时,新提交的任务会被提交给RejectedExecutionHandler处理
5.当线程池中超过corePoolSize
的大小,空闲时间达到keepAliveTime
,关闭线程。
6.当设置allowCoreThreadTimeOut(true)
时,线程池中corePoolSize
线程空闲时间达到keepAliveTime
也将关闭
线程管理的机制图:
再看回Executor
创建3种类型的ThreadPoolExecutor
线程池:
1. FixedThreadPool
这个线程特点就是可以创建固定大小的线程池,他的构造源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 0L表示当线程池中的线程数多于
corePoolSize
的数量时,多于的线程将被停止。 - 最后一个参数是
LinkedBlockingQueue
,是一个无界队列,因此线程池中的线程数量达到corePoolSize
后,新任务将在队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。
执行过程如下:
1.如果当前工作中的线程数量少于corePool
的数量,就创建新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool
,则将任务加入LinkedBlockingQueue
。
3.线程执行完1中的任务后会从队列中去任务。
2. SingleThreadExecutor
SingleThreadExecutor是使用单个worker
线程的Executor
。特点是使用单个工作线程执行任务.它的源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePool
和maximumPoolSize
的大小都是1,其他参数的配置跟FixedThreadPool一样
执行过程如下:
1.如果当前工作中的线程数量少于corePool
的数量,就创建新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool
,则将任务加入LinkedBlockingQueue
。
3.线程执行完1中的任务后会从队列中去任务。
3.CachedThreadPool
CachedThreadPool是一个“无限”容量的线程池,他会根据需要去创建线程,没有特定的corePool
,它的构造方法如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePool
=0,maximumPoolSize
设置为无限大,keepAliveTime设置为60S,意味着空闲的线程最多等待时间是60S,
然后将被回收。
CachedThreadPool
使用没有容量的SynchronousQueue
作为主线程池的工作队列,
它是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作。只要有请求到来,就必须要找到一条工作线程处理他,
如果当前没有空闲的线程,那么就会再创建一条新的线程。所以如果主线程提交任务的速度高于线程池中处理任务的速度时,
CachedThreadPool
会不断创建新线程。极端情况下,CachedThreadPool
会因为创建过多线程而耗尽CPU资源,一般使用场景
任务处理速度 > 任务提交速度 ,这样才能保证不会不断创建新的进程,避免内存被占满。
执行过程如下:
1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。,execute()方法执行成功,否则执行步骤2
2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务。
3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。
SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。