前言
在上一章节,我们了解到了多线程的一些基础知识点——多线程的实现、CAS 是什么以及 AQS 的工作流程,接下来,在这章节中,我会通过有关于应用性的多线程面试题的内容,来给大家介绍相关的多线程知识点。
下面我们通过 2 道常问的面试题进行相关知识点的学习:
1. BlockingQueue(阻塞队列)原理是什么?手写阻塞队列的 take 和 put 方法
2. 如何定义线程池初始化参数?使用无界队列的线程池会导致内存飙升吗?
这里面谈到的阻塞队列和 AQS 的阻塞队列不是同一个东西,该文章谈到的阻塞队列全都是关乎于BlockingQueue接口
1. BlockingQueue 原理是什么?手写阻塞队列的 take 和 put 方法
阻塞队列,从字面意义上看,就是一个带阻塞功能的队列,那么它会在什么时候出现阻塞呢?
- 如果队列为空,那么想要队列出队即执行 take() 方法的线程会被阻塞。
- 如果队列已满,那么想要队列入队即执行 put() 方法的线程会被阻塞。
在上一个章节我们了解到了 AQS 以及它的一个实现类 ReentrantLock,而在 Java 的阻塞队列当中,则是借助了 ReentrantLock 类实现了阻塞的功能,除此之外,为了实现锁的细腻度,还额外使用了另一个接口—— Condition。
在介绍阻塞队列之前,我们先来了解下 AQS 中的 Conditon 接口。传统多线程方式当中,我们通常会使用 synchronized 关键字配合 Object 类的 wait()/notifyAll() 方法实现多线程的协同工作,相同的,为了配合 AQS ,AQS 自身通过内部类实现 Condition 接口来完成多线程的协同。
上文提到了一个锁的细腻度,可能有的人无法理解这个锁的细腻度是什么,那么接下来我便简单举一个例子。
假如我们使用 synchronized 关键字配合 Object 类的 wait()/notifyAll() 来实现一个生产者和消费者模式的时候,由于你无法指定唤醒哪一类线程,因此只能使用 notifyAll() 方法将生产者和消费者等待的线程都唤醒,这样的坏处便是无需被唤醒的线程被唤醒了,导致了性能的浪费。而所谓的细腻度,便是针对性的唤醒线程,每次唤醒都不会导致性能的浪费,例如我生产者只能唤醒消费者,而消费者也只能唤醒生产者,这样便不会出现无关线程被唤醒又重新进入阻塞的问题。
Condition 类就是生成了一个与阻塞队列隔离的条件队列,每个 Condition 对象都相当于是一个队列,里面由多个 Node 元素以单向链表的形式组合而成,每个队列元素出队后都会加入到阻塞队列的队尾,我们通过一张图来简单了解下 Condition:
可以看出,条件队列也是有序的,即在条件队列中阻塞的线程如果没有被中断,都会根据队列的顺序进行有序的唤醒出队,即每次唤醒的都是在队列中等待最久的线程。
有个注意点,Condition 对象的 await()/signal() 方法必须要线程获取到锁只能才会正常执行,否则会抛出异常,即在没有获取锁的环境调用这些方法,便会抛出 IllegalMonitorStateException 异常,这个和 Object 对象的 wait()/notify() 方法一样。
接下来我简单描述下 Condtion 条件队列的 入队 await() / 出队 signal() 执行流程。
await() 方法:
- 将当前线程包装为一个 waitStatus 字段为 CONDITION(-2) 的 node 对象并加入到条件队列尾部。
- 记录当前锁的 state 字段值,并将当前锁的状态值改为 0 (重入锁也一样改为0)。
- 通过 LockSupport.park() 方法进入阻塞。
signal() 方法
- 断开与条件队列的关系,即将当前 node 对象的 nextWaiter 指针字段设置为 null。
- 通过 CAS 将当前 node 对象的 waitStatus 字段设置为 0,并将设置好的节点通过自旋 CAS 加入到阻塞队列的队尾
- 将当前 node 对象的前驱节点的 waitStatus 字段设置为 SIGNAL(-1),代表当前节点需要这个节点进行唤醒;如果这个前驱节点处于取消状态,那么直接唤醒当前 node 对象的线程,让其去竞争锁,相当于一个新的线程进入 AQS 一样,接下来的步骤读者可以参考上一篇 AQS 的运行流程。
了解完 AQS(ReentrantLock)/Conditon 的内容,接下来再看阻塞队列就很简单了,其实阻塞队列的原理就是:Lock 配合多个 Condtion 进行阻塞控制,配合多个 Conodtion 只是为了针对性唤醒来减少性能的损耗。开发者不需要担心什么时候需要阻塞队列,只需要往队列存入/取出数据即可,因为一切有关阻塞的功能都在内部进行实现。
接下来,我通过一个简单的例子来实现阻塞队列的 take()/put(E e) 方法。
public class BlockQueue<E>{
//一个 Lock 对应多个 Condition
Lock lock = new ReentrantLock();
Condition producer = lock.newCondition();
Condition consumer = lock.newCondition();
//有界队列,即队列有容量限制
int size;
//这里通过一个List存储队列元素
List<E> queue;
public BlockQueue(int size){
this.size = size;
queue = new ArrayList<>(size);
}
public void put(E e) throws InterruptedException {
try {
lock.lock();
//如果容量已满,对生产者线程进行阻塞
while(size == queue.size()){
producer.await();
}
queue.add(e);
//有新的商品生产了,可以唤醒被阻塞的消费者进行消费
consumer.signal();
}finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
try{
lock.lock();
//没有商品便阻塞消费者
while (queue.size() == 0){
consumer.await();
}
E e = queue.get(0);
queue.remove(0);
//商品被消费了一个,可以唤醒生产者继续生产
producer.signal();
return e;
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
BlockQueue<Object> queue = new BlockQueue<>(2);
Thread producer = new Thread(()->{
while (true){
try {
queue.put(new Object());
System.out.println(Thread.currentThread().getName() + " 生产了一个商品");
} catch (InterruptedException e) {}
}
});
producer.setName("producer");
Thread consumer = new Thread(()->{
while (true){
try {
Object take = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费了一个商品");
//每1秒才会有一个商品被消费
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} catch (InterruptedException e) {}
}
});
consumer.setName("consumer");
producer.start();
consumer.start();
}
}
从上面代码可以看出,阻塞队列的核心就在如何正确的使用 Lock 和 Condition,当然,Java 提供的阻塞队列的功能还不仅仅如此,但由于篇幅原因,这里不在阐述过多内容,有兴趣的读者可以自行查阅相关资料。
2. 如何定义线程池初始化参数?使用无界队列的线程池会导致内存飙升吗?
在了解如何定义初始化参数前,我们通过一个图来了解 jdk5 提供的线程池的构造参数:
从上图可以看出,线程池的构造参数分为三大类,分别是线程相关、时间相关、任务相关,下面我们主要根据线程相关以及任务相关进行分析:
- 线程相关我们只需要根据业务的类型进行最大线程数的设值
- 任务相关首先是禁止使用无界阻塞队列,其次便是根据业务实现自定义的拒绝策略,例如任务允许丢失,则可以通过日志简单记录该任务然后丢弃,如果任务不允许丢失,可以考虑新建线程去处理(如果服务器性能够强大),或者记录详细的任务信息,等线程池空闲的时候在进行处理。
了解完构造参数的定义规则,接下来我们来看看为什么线程池禁止使用无界队列。
回到问题,该问题的答案便是存在使用无界队列会导致内存飙升的场景,至于为什么,这个便涉及到线程池的执行过程,因此下面我们通过了解线程池的执行过程来详细解答这个问题。
- 首先将任务给常驻线程处理
- 如果常驻线程来不及处理,则将任务放入阻塞队列
- 如果阻塞队列满了并且线程数量少于maximumPoolSize,则将线程数扩展至maximumPoolSize
- 如果仍然处理不完,阻塞队列满了,则进行拒绝策略。
- 如果任务被快速处理完,除常驻线程外的其他线程,在经过keepAliveTime(unit)时间后自动销毁
从线程池执行流程的第三步可以看出,创建非常驻线程的前提是阻塞队列已满,但是对于无界队列来说(上限为Integer.MAX_VALUE),几乎不可能出现阻塞队列满的情况,如果常驻线程无法快速执行任务,任务便会一直往队列中添加积累,最终导致内存飙升的问题。
总结
在多线程方面,首先我们得先了解多线程的实现,实现完多线程后便考虑如何管理多线程的竞争,多线程的竞争可以用到基于 CAS 和 AQS 实现的锁和阻塞队列,最后通过线程池来进行任务分配和线程的回收利用,如果读者在看这两章节内容时理不清楚顺序的话,可以基于这段总结在回看一次。
接下来,我们回到上面的两个问题
1. BlockingQueue(阻塞队列)原理是什么?手写阻塞队列的 take 和 put 方法
2. 如何定义线程池初始化参数?使用无界队列的线程池会导致内存飙升吗?
如果你们可以很流畅的回答这些问题,那么恭喜你,该章节的内容已经全部掌握,如果不行,希望可以回到对应问题讲解的地方,或者对某个不了解的点进行额外的知识搜索,尽量用自己组织的语言回答这些问题。