在准备开始详细分析java多线程源码时,发现了这样一个问题,也就是说,在面试的时候经常要求,手写阻塞队列,或者手写一个程序,交替打印1-10000。于是,看到这里经不住动手一试。
基础知识
实际上,要实现一个BlockQueue,实际上就是自己写一个简单的生产者、消费者模型的代码。需要我们实现一个Queue,做为临界区,之后,当临界区不足的时候可以添加元素,当临界区达到阈值的时候则生产者线程等待。同理,对于消费者线程,那么当临界区有元素的时候就可以消费,没用元素的时候就要停止消费。那么我们需要用到 synchronized、wait、notify。
BlockQueue
实现的BlockQueue如下:
package com.dhb.concurrent.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class BlockQueue {
public static final int CAPACITY = 20;
private Object lock = new Object();
private static List<Integer> queue = new ArrayList<>(CAPACITY);
public static void main(String[] args) {
BlockQueue blockQueue = new BlockQueue();
new Thread(() -> {
for(;;) {
blockQueue.add(ThreadLocalRandom.current().nextInt());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者线程").start();
new Thread(() -> {
for(;;) {
blockQueue.take();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者线程").start();
new Thread(() -> {
for (;;){
System.out.println("queue 长度:["+queue.size()+"] 当前线程为:"+Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"监控线程").start();
}
public void add(int num){
synchronized (lock) {
lock.notify();
if(queue.size()< CAPACITY) {
queue.add(queue.size(),num);
System.out.println("Queue produce :["+num+"] 当前线程为:"+Thread.currentThread().getName());
}else {
try {
System.out.println("Queue 已满,生产者开始wait");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void take() {
synchronized (lock) {
lock.notify();
if(queue.size() > 0) {
int num = queue.remove(0);
System.out.println("Queue Cosumer :["+num+"] 当前线程为:"+Thread.currentThread().getName());
}else {
try {
System.out.println("Queue 为空 消费者开始wait");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
此时生产者线程的sleep时间大于消费者线程,因此执行一段时间之后,队列会达到100。执行结果如下:
Queue 为空 消费者开始wait
Queue produce :[-147870520] 当前线程为:生产者线程
queue 长度:[1] 当前线程为:监控线程
Queue produce :[-1580265186] 当前线程为:生产者线程
queue 长度:[2] 当前线程为:监控线程
Queue produce :[-1164881605] 当前线程为:生产者线程
queue 长度:[3] 当前线程为:监控线程
Queue produce :[-696944987] 当前线程为:生产者线程
queue 长度:[4] 当前线程为:监控线程
Queue produce :[-1521767909] 当前线程为:生产者线程
queue 长度:[5] 当前线程为:监控线程
Queue produce :[75182609] 当前线程为:生产者线程
queue 长度:[6] 当前线程为:监控线程
Queue produce :[-619918250] 当前线程为:生产者线程
queue 长度:[7] 当前线程为:监控线程
Queue produce :[52940195] 当前线程为:生产者线程
queue 长度:[8] 当前线程为:监控线程
Queue produce :[-591206187] 当前线程为:生产者线程
queue 长度:[9] 当前线程为:监控线程
Queue produce :[653198854] 当前线程为:生产者线程
queue 长度:[10] 当前线程为:监控线程
Queue Cosumer :[-147870520] 当前线程为:消费者线程
Queue produce :[-1809273459] 当前线程为:生产者线程
queue 长度:[10] 当前线程为:监控线程
Queue produce :[747239524] 当前线程为:生产者线程
queue 长度:[11] 当前线程为:监控线程
Queue produce :[1845821452] 当前线程为:生产者线程
queue 长度:[12] 当前线程为:监控线程
Queue produce :[-840388150] 当前线程为:生产者线程
queue 长度:[13] 当前线程为:监控线程
Queue produce :[-1020004646] 当前线程为:生产者线程
queue 长度:[14] 当前线程为:监控线程
Queue produce :[430450096] 当前线程为:生产者线程
queue 长度:[15] 当前线程为:监控线程
Queue produce :[1132912512] 当前线程为:生产者线程
queue 长度:[16] 当前线程为:监控线程
Queue produce :[1353255060] 当前线程为:生产者线程
queue 长度:[17] 当前线程为:监控线程
Queue produce :[-1782663315] 当前线程为:生产者线程
queue 长度:[18] 当前线程为:监控线程
Queue produce :[-1688165675] 当前线程为:生产者线程
queue 长度:[19] 当前线程为:监控线程
Queue Cosumer :[-1580265186] 当前线程为:消费者线程
Queue produce :[-1150952827] 当前线程为:生产者线程
queue 长度:[19] 当前线程为:监控线程
Queue produce :[575115832] 当前线程为:生产者线程
queue 长度:[20] 当前线程为:监控线程
Queue 已满,生产者开始wait
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
Queue Cosumer :[-1164881605] 当前线程为:消费者线程
queue 长度:[19] 当前线程为:监控线程
Queue produce :[1364683693] 当前线程为:生产者线程
queue 长度:[20] 当前线程为:监控线程
Queue 已满,生产者开始wait
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
queue 长度:[20] 当前线程为:监控线程
如果我们将生产者的sleep时间改为1秒,消费者的sleep时间改为0.1秒,那么队列不会出现堆积:
Queue 为空 消费者开始wait
Queue produce :[176104373] 当前线程为:生产者线程
queue 长度:[1] 当前线程为:监控线程
queue 长度:[1] 当前线程为:监控线程
Queue Cosumer :[176104373] 当前线程为:消费者线程
Queue 为空 消费者开始wait
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
Queue produce :[-202551884] 当前线程为:生产者线程
queue 长度:[1] 当前线程为:监控线程
Queue Cosumer :[-202551884] 当前线程为:消费者线程
queue 长度:[0] 当前线程为:监控线程
Queue 为空 消费者开始wait
queue 长度:[0] 当前线程为:监控线程
queue 长度:[0] 当前线程为:监控线程
这样就实现了一个简单的BlockQueue模型。
交替print
代码实现如下:
package com.dhb.concurrent.test;
public class Printer implements Runnable {
private static final int MAX_NUM = 100;
private Object lock = new Object();
private int num = 1;
public static void main(String[] args) {
Printer p = new Printer();
new Thread(p, "thread1").start();
new Thread(p, "thread2").start();
}
@Override
public void run() {
while (true) {
synchronized (lock) {
lock.notify();
if (num <= MAX_NUM) {
System.out.println("printer:[" + (num++) + "] thread:" + Thread.currentThread().getName());
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
break;
}
}
}
}
}
上述代码输出:
printer:[1] thread:thread2
printer:[2] thread:thread1
printer:[3] thread:thread2
printer:[4] thread:thread1
printer:[5] thread:thread2
printer:[6] thread:thread1
printer:[7] thread:thread2
printer:[8] thread:thread1
printer:[9] thread:thread2
printer:[10] thread:thread1
printer:[11] thread:thread2
printer:[12] thread:thread1
printer:[13] thread:thread2
printer:[14] thread:thread1
printer:[15] thread:thread2
printer:[16] thread:thread1
printer:[17] thread:thread2
printer:[18] thread:thread1
printer:[19] thread:thread2
printer:[20] thread:thread1
printer:[21] thread:thread2
printer:[22] thread:thread1
printer:[23] thread:thread2
printer:[24] thread:thread1
printer:[25] thread:thread2
printer:[26] thread:thread1
printer:[27] thread:thread2
printer:[28] thread:thread1
printer:[29] thread:thread2
printer:[30] thread:thread1
printer:[31] thread:thread2
printer:[32] thread:thread1
printer:[33] thread:thread2
printer:[34] thread:thread1
printer:[35] thread:thread2
printer:[36] thread:thread1
printer:[37] thread:thread2
printer:[38] thread:thread1
printer:[39] thread:thread2
printer:[40] thread:thread1
printer:[41] thread:thread2
printer:[42] thread:thread1
printer:[43] thread:thread2
printer:[44] thread:thread1
printer:[45] thread:thread2
printer:[46] thread:thread1
printer:[47] thread:thread2
printer:[48] thread:thread1
printer:[49] thread:thread2
printer:[50] thread:thread1
printer:[51] thread:thread2
printer:[52] thread:thread1
printer:[53] thread:thread2
printer:[54] thread:thread1
printer:[55] thread:thread2
printer:[56] thread:thread1
printer:[57] thread:thread2
printer:[58] thread:thread1
printer:[59] thread:thread2
printer:[60] thread:thread1
printer:[61] thread:thread2
printer:[62] thread:thread1
printer:[63] thread:thread2
printer:[64] thread:thread1
printer:[65] thread:thread2
printer:[66] thread:thread1
printer:[67] thread:thread2
printer:[68] thread:thread1
printer:[69] thread:thread2
printer:[70] thread:thread1
printer:[71] thread:thread2
printer:[72] thread:thread1
printer:[73] thread:thread2
printer:[74] thread:thread1
printer:[75] thread:thread2
printer:[76] thread:thread1
printer:[77] thread:thread2
printer:[78] thread:thread1
printer:[79] thread:thread2
printer:[80] thread:thread1
printer:[81] thread:thread2
printer:[82] thread:thread1
printer:[83] thread:thread2
printer:[84] thread:thread1
printer:[85] thread:thread2
printer:[86] thread:thread1
printer:[87] thread:thread2
printer:[88] thread:thread1
printer:[89] thread:thread2
printer:[90] thread:thread1
printer:[91] thread:thread2
printer:[92] thread:thread1
printer:[93] thread:thread2
printer:[94] thread:thread1
printer:[95] thread:thread2
printer:[96] thread:thread1
printer:[97] thread:thread2
printer:[98] thread:thread1
printer:[99] thread:thread2
printer:[100] thread:thread1
我们也可以将线程改为多个,如果再添加一个thread3:
new Thread(p, "thread3").start();
输出结果:
printer:[0] thread:thread1
printer:[1] thread:thread3
printer:[2] thread:thread2
printer:[3] thread:thread3
printer:[4] thread:thread2
printer:[5] thread:thread3
printer:[6] thread:thread2
printer:[7] thread:thread3
printer:[8] thread:thread2
printer:[9] thread:thread1
printer:[10] thread:thread2
printer:[11] thread:thread3
printer:[12] thread:thread1
printer:[13] thread:thread2
printer:[14] thread:thread3
printer:[15] thread:thread1
printer:[16] thread:thread2
printer:[17] thread:thread3
printer:[18] thread:thread1
printer:[19] thread:thread2
printer:[20] thread:thread3
printer:[21] thread:thread1
printer:[22] thread:thread2
printer:[23] thread:thread3
printer:[24] thread:thread1
printer:[25] thread:thread2
printer:[26] thread:thread3
printer:[27] thread:thread1
printer:[28] thread:thread2
printer:[29] thread:thread3
printer:[30] thread:thread1
printer:[31] thread:thread2
printer:[32] thread:thread3
printer:[33] thread:thread1
printer:[34] thread:thread2
printer:[35] thread:thread3
printer:[36] thread:thread1
printer:[37] thread:thread2
printer:[38] thread:thread3
printer:[39] thread:thread1
printer:[40] thread:thread2
printer:[41] thread:thread3
printer:[42] thread:thread1
printer:[43] thread:thread2
printer:[44] thread:thread3
printer:[45] thread:thread1
printer:[46] thread:thread2
printer:[47] thread:thread3
printer:[48] thread:thread1
printer:[49] thread:thread2
printer:[50] thread:thread3
printer:[51] thread:thread1
printer:[52] thread:thread2
printer:[53] thread:thread3
printer:[54] thread:thread1
printer:[55] thread:thread2
printer:[56] thread:thread3
printer:[57] thread:thread1
printer:[58] thread:thread2
printer:[59] thread:thread3
printer:[60] thread:thread1
printer:[61] thread:thread2
printer:[62] thread:thread3
printer:[63] thread:thread1
printer:[64] thread:thread2
printer:[65] thread:thread3
printer:[66] thread:thread1
printer:[67] thread:thread2
printer:[68] thread:thread3
printer:[69] thread:thread1
printer:[70] thread:thread2
printer:[71] thread:thread3
printer:[72] thread:thread1
printer:[73] thread:thread2
printer:[74] thread:thread3
printer:[75] thread:thread1
printer:[76] thread:thread2
printer:[77] thread:thread3
printer:[78] thread:thread1
printer:[79] thread:thread2
printer:[80] thread:thread3
printer:[81] thread:thread1
printer:[82] thread:thread2
printer:[83] thread:thread3
printer:[84] thread:thread1
printer:[85] thread:thread2
printer:[86] thread:thread3
printer:[87] thread:thread1
printer:[88] thread:thread2
printer:[89] thread:thread3
printer:[90] thread:thread1
printer:[91] thread:thread2
printer:[92] thread:thread3
printer:[93] thread:thread1
printer:[94] thread:thread2
printer:[95] thread:thread3
printer:[96] thread:thread1
printer:[97] thread:thread2
printer:[98] thread:thread3
printer:[99] thread:thread1
printer:[100] thread:thread2
实际上可以看到,当有3个线程的时候,线程打印就不是平均了。这也验证了wait实际上是随机唤醒等待队列中的线程,而不是按顺序。
总结
以上就是通过synchronized和wait、notify实现简单的手写代码的过程。也有人说为什么不用notifyAll。实际上关于notify和notifyAll的区别也是面试过程中的重点。由于涉及到线程同步的模型,本文暂不展开,后续详细说明。
由此看来,实际上这两个问题并不难,需要我们对java常用的并发过程比较熟悉即可。
需要注意的是,在代码中,一定要在synchronized之后就调用notify,将其他wait的线程唤醒之后,再进行下一步操作。否则可能会导致某些线程在执行结束之后由于没用被唤醒而不会退出。
synchronized (lock) {
lock.notify();
//...处理逻辑
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}