阻塞队列
概念:当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。
好处:在concurrent包发布之前,多线程环境下需要手动控制线程的阻塞和唤醒。而阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。
体系:Collection
→Queue
→BlockingQueue
→七个阻塞队列实现类。
类名 | 作用 |
---|---|
ArrayBlockingQueue | 由数组构成的有界阻塞队列 |
LinkedBlockingQueue | 由链表构成的有界阻塞队列 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 支持优先级的延迟无界阻塞队列 |
SynchronousQueue | size为0的LinkedTransferQueue |
LinkedTransferQueue | 由链表构成的无界阻塞队列 |
LinkedBlockingDeque | 由链表构成的双向阻塞队列 |
粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的。
需要注意的是LinkedBlockingQueue
虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE
,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor
有体现)。
API:抛出异常是指当队列满时,再次插入会抛出异常;返回布尔是指当队列满时,再次插入会返回false;阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。超时是指当一个时限过后,才会插入或者取出。
方法类型 | 抛出异常 | 返回布尔 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(E e) | offer(E e) | put(E e) | offer(E e,Time,TimeUnit) |
取出 | remove() | poll() | take() | poll(Time,TimeUnit) |
队首 | element() | peek() | 无 | 无 |
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
addAndRemove(blockingQueue);
/*====================output========================
add*4 && remove*3:
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at BlockingQueueDemo.addAndRemove(BlockingQueueDemo.java:60)
at BlockingQueueDemo.main(BlockingQueueDemo.java:8)
add*3 && remove*4:
true
true
true
a
a
b
c
Exception in thread "main" java.util.NoSuchElementException
*/
offerAndPoll(blockingQueue);
/*====================output========================
true
true
true
false
a
a
b
c
null
*/
putAndTake(blockingQueue);
/*====================output========================
put和take都会阻塞
*/
outOfTime(blockingQueue);
/*====================output========================
true
true
true
//睡眠2s后添加失败,打印false
false
*/
}
private static void outOfTime(BlockingQueue<String> blockingQueue) throws InterruptedException {
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
}
private static void putAndTake(BlockingQueue<String> blockingQueue) throws InterruptedException {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
private static void offerAndPoll(BlockingQueue<String> blockingQueue) {
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("e"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
private static void addAndRemove(BlockingQueue<String> blockingQueue) {
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
SynchronousQueue
SynchronousQueue没有容量,每一个put操作必须等待一个take操作,反之亦然。注意不要把put和take操作写在主线程中,阻塞了主线程就运行不了了。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue=new SynchronousQueue<String>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(()->{
try {
try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take "+blockingQueue.take());
try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take "+blockingQueue.take());
try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
} catch (Exception e) {
e.printStackTrace();
}
},"BBB").start();
}
}
/*====================output========================
AAA put 1
//5s后
BBB take 1
AAA put 2
//5s后
BBB take 2
AAA put 3
//5s后
BBB take3
*/
Callable接口
与Runnable的区别:
- Callable带返回值。
- 会抛出异常。
- 覆写
call()
方法,而不是run()
方法。
Thread的构造方法里面不能接受Callable接口,Java的解决方法是通过FutureTask类做一个适配器(中间人),FutureTask继承自Runnable接口,其构造方法可以传入Callable类型。
Callable接口的使用:
public class CallableDemo {
//实现Callable接口
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("callable come in ...");
return 1024;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建FutureTask类,接受MyThread,MyThread实现的是Callable接口
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
//将FutureTask对象放到Thread类的构造器里面。因为FutureTask类实现了Runnable接口
new Thread(futureTask, "AA").start();
int result01 = 100;
//用FutureTask的get方法得到返回值。
int result02 = futureTask.get();
//ForkAndJoin
System.out.println("result=" + (result01 + result02));
}
}
注意:建议把futureTask.get()放最后,因为其要求获得Callable线程计算的结果,如果计算没有完成就会强求,会导致阻塞,直到计算完成。并且,多个线程来计算futureTask,其也是只需要计算一次,结果是可以复用的。
阻塞队列的应用——生产者消费者
传统模式
传统模式使用Lock
来进行操作,需要手动加锁、解锁。详见ProdConsTradiDemo。
public void increment() throws InterruptedException {
lock.lock();
try {
//1 判断 如果number=1,那么就等待,停止生产
while (number != 0) {
//等待,不能生产
condition.await();
}
//2 干活 否则,进行生产
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3 通知唤醒 然后唤醒消费线程
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
//最后解锁
lock.unlock();
}
}
阻塞队列模式
使用阻塞队列就不需要手动加锁了,详见ProdConsBlockQueueDemo。
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";//++i
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t" + "插入队列" + data + "成功");
} else {
ystem.out.println(Thread.currentThread().getName() + "\t" + "插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\tFLAG==false,停止生产");
}
阻塞队列的应用——线程池
线程池基本概念
概念:线程池主要是控制运行线程的数量,将待处理任务放到等待队列,然后创建线程执行这些任务。如果超过了最大线程数,则等待。
优点:
- 线程复用:不用一直new新线程,重复利用已经创建的线程来降低线程的创建和销毁开销,节省系统资源。
- 提高响应速度:当任务达到时,不用创建新的线程,直接利用线程池的线程。
- 管理线程:可以控制最大并发数,控制线程的创建等。
重点:四种使用java多线程的方法
- 继承Thread类
- 实现Runnable接口(无返回值,不抛出异常)
- 实现Callable接口(有返回值,会抛异常)
- 使用线程池
体系:Executor
→ExecutorService
→AbstractExecutorService
→ThreadPoolExecutor
。ThreadPoolExecutor
是线程池创建的核心类。类似Arrays
、Collections
工具类,Executor
也有自己的工具类Executors
。
线程池三种常用创建方式
newFixedThreadPool:使用LinkedBlockingQueue
实现,定长线程池。
//以下为JDK源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor:使用LinkedBlockingQueue
实现,一池只有一个线程。
//以下为JDK源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool:使用SynchronousQueue
实现,变长线程池。
//以下为JDK源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池创建的七个参数(ThreadPoolExecutor)
参数 | 意义 |
---|---|
corePoolSize | 线程池常驻核心线程数 |
maximumPoolSize | 能够容纳的最大线程数 |
keepAliveTime | 空闲线程存活时间 |
unit | 存活时间单位 |
workQueue | 存放提交但未执行任务的队列 |
threadFactory | 创建线程的工厂类 |
handler | 等待队列满后的拒绝策略 |
理解:线程池的创建参数,就像一个银行。
corePoolSize
就像银行的“当值窗口“,比如今天有2位柜员在受理客户请求(任务)。如果超过2个客户,那么新的客户就会在等候区(等待队列workQueue
)等待。当等候区也满了,这个时候就要开启“加班窗口”,让其它3位柜员来加班,此时达到最大窗口maximumPoolSize
,为5个。如果开启了所有窗口,等候区依然满员,此时就应该启动”拒绝策略“handler
,告诉不断涌入的客户,叫他们不要进入,已经爆满了。由于不再涌入新客户,办完事的客户增多,窗口开始空闲,这个时候就通过keepAlivetTime
将多余的3个”加班窗口“取消,恢复到2个”当值窗口“。
线程池底层原理
原理图:上面银行的例子,实际上就是线程池的工作原理。
流程图:
线程池的主要处理流程:
- 在创建了线程池后,等待提交过来的任务请求。
- 当调用 execute()方法添加一个请求任务时,线程池会做如下判断:
2.1 如果正在运行的线程数量小于 core Poolsize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 core Poolsize,那么将这个任务放入队列
2.3 如果这时候队列满了且正在运行的线程数量还小于 maximum Poolsize,那么还是要创建非核心线程立刻运行这个仼务
2.4 如果队列满了且正在运行的线程数量大于或等于 maximum Poolsize,那么线程池会启动饱和拒绝策略来执行。- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做超过一定的时间( keepAlive Time)时,线程池会判断,如果当前运行的线程数大于 core Poolsize,那么这个线程就被停掉。所以线程池的所有任务完成后它最终会收缩到 core Poolsize的大小。
线程池的拒绝策略
当等待队列满时,且达到最大线程数,再有新任务到来,就需要启动拒绝策略。JDK提供了四种拒绝策略(ThreadPoolExecutor第七个参数),分别是。
-
AbortPolicy:默认的策略,直接抛出
RejectedExecutionException
异常,阻止系统正常运行。 - CallerRunsPolicy:既不会抛出异常,也不会终止任务,而是将任务返回给调用者。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交任务。
- DiscardPolicy:直接丢弃任务,不做任何处理。
面试大坑:
Executors 中 JDK提供了三种线程池的类型,在工作中,你会用那个?
答:一个都不用。原因如下文所示。
实际生产使用哪一个线程池?
单一、可变、定长都不用!原因就是newFixedThreadPool
和SingleThreadExecutor
底层都是用LinkedBlockingQueue
实现的,这个队列最大长度为Integer.MAX_VALUE
,显然会导致OOM。所以实际生产一般自己通过ThreadPoolExecutor
的7个参数,自定义线程池。
ExecutorService threadPool=new ThreadPoolExecutor(2,5,
1L,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
下面的demo是自定义线程池,最大线程数为5,阻塞队列大小为3。所以最多能接受8个线程。当使用AbortPolicy拒绝策略时,模拟9个线程办理业务,超出线程池服务数量的最大值,直接抛出异常。
import java.util.concurrent.*;
public class MyThreadPoolDemo {
public static void main(String[] args) {
//根据ThreadPoolExecutor的七个参数自定义的线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2, //线程核心数2个
5, //线程最大数5个
1L, //1s后,如无等待,从MaxSize降为CoreSize
TimeUnit.SECONDS, //keepAliveTime:单位s
new LinkedBlockingQueue<>(3), //阻塞等待队列:等待最大数设为3,默认为Integer.MAX_VALUE
Executors.defaultThreadFactory(), //默认线程工厂
new ThreadPoolExecutor.AbortPolicy()); //默认拒绝方法
try {
for (int i = 0; i <9 ; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭线程池
threadPool.shutdown();
}
}
}
/*====================output========================
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-5 办理业务
pool-1-thread-4 办理业务
pool-1-thread-3 办理业务
pool-1-thread-2 办理业务
pool-1-thread-1 办理业务
java.util.concurrent.RejectedExecutionException: Task MyThreadPoolDemo$$Lambda$1/990368553@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 6]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at MyThreadPoolDemo.main(MyThreadPoolDemo.java:19)
*/
上面的demo其他代码不动,把拒绝策略改为CallerRunsPolicy时,打印的结果为:
/*====================output========================
pool-1-thread-2 办理业务
pool-1-thread-5 办理业务
pool-1-thread-3 办理业务
main 办理业务
pool-1-thread-4 办理业务
pool-1-thread-1 办理业务
pool-1-thread-3 办理业务
pool-1-thread-5 办理业务
pool-1-thread-2 办理业务
*/
CallerRunsPolicy 既不会抛出异常,也不会终止任务,而是将任务返回给调用者。即是main线程调用了线程池,线程池完成不了任务,所以把任务返回给主线程进行完成。
而DiscardOldestPolicy和DiscardPolicy拒绝策略都会丢弃任务,DiscardOldestPolicy丢弃的是阻塞队列中等待最久的任务,DiscardPolicy丢弃的是处理不了的那个任务。从下面的打印结果,两者并看不出区别。
pool-1-thread-1 办理业务
/*====================output========================
pool-1-thread-5 办理业务
pool-1-thread-5 办理业务
pool-1-thread-3 办理业务
pool-1-thread-4 办理业务
pool-1-thread-2 办理业务
pool-1-thread-5 办理业务
pool-1-thread-1 办理业务
*/
自定义线程池参数选择
最大线程数的确定:
- 对于CPU密集型任务,最大线程数是CPU线程数+1
- 对于IO密集型任务,尽量多配点,因为大部分线程都阻塞
第一种方式: 可以是CPU线程数*2
第二种方式:CPU线程数 / (1-阻塞系数),阻塞系数在0.8~0.9之间获得CPU核数的方式为:
Runtime.getRuntime().availableProcessors()
线程池的详细使用见这篇文章
死锁定义,编码和定位
死锁定义:是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象。
产生死锁的原因:
- 系统资源不足
- 进程推进顺序不当
- 资源分配不当
若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁岀现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
使用代码实现死锁:
import java.util.concurrent.TimeUnit;
public class T implements Runnable{
private String lockA;
private String lockB;
//构造函数
T(String lockA, String lockB){
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
//锁定对象lockA
synchronized (lockA){
System.out.println(Thread.currentThread().getName() + "\t 自己持有" + lockA + "尝试获得" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//锁定lockA之后再尝试锁定对象lockB
synchronized (lockB){
System.out.println(Thread.currentThread().getName() + "\t 自己持有" + lockB + "尝试获得" + lockA);
}
}
}
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
//交换锁定
new Thread(new T(lockA,lockB),"AAA").start();
new Thread(new T(lockB,lockA),"BBB").start();
}
}
/*====================output========================
AAA 自己持有lockA尝试获得lockB
BBB 自己持有lockB尝试获得lockA
卡住ing。。。。。。。。。。。。。。。。。
*/
查看死锁:
主要是两个命令配合起来使用,定位死锁。
jps指令(java ps):jps -l
可以查看运行的Java进程。
jstack指令:jstack pid
可以查看某个Java进程的堆栈信息,同时分析出死锁。
参考:https://github.com/MaJesTySA/JVM-JUC-Core/blob/master/docs/JUC.md#jmm