身份越来越多,自己越来越少。 — 《一念天堂》
写在前面
阻塞队列常用于生产者和消费者的场景,生产者就是往队列中放入元素,消费者就是从队列中获取元素,阻塞队列就是生产者存放元素的容器,而消费者也从该容器中拿元素。
阻塞队列有两种常见的阻塞场景,满足这两种阻塞场景的队列就是阻塞队列,分别如下:
- 当队列中没有元素的情况下,消费者端的所有线程会被自动阻塞,直到生产者往队列中放入元素,线程会被自动唤醒。
- 当队列中元素填满的情况下,生产者端的所有线程会被自动阻塞,直到消费者从队列中获取元素,线程会被自动唤醒。
Java中的阻塞队列
Java中提供了7个阻塞队列,分别如下:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序,支持公平锁和非公平锁。
- LinkedBlockingQueue:由链表结构组成的有界阻塞队列,按照先进先出的原则对元素进行排序,默认长度为Integer.MAX_VALUE。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列,默认自然序对元素进行排序,可以自定义实现compareTo()方法指定排序规则,不保证同优先级元素的顺序。
- DelayedQueue:使用优先级队列实现的无界阻塞队列,在创建元素时,可以指定多久才能从队列中获取元素,只有延时期满后才能从队列中获取元素。
- SynchronousQueue:不存储元素的阻塞队列,每一个put操作都要等待take操作,否则不能添加元素,支持公平锁和非公平锁。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列,相当于其他队列,多了transfer和tryTransfer方法。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列,队首和队尾都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
ArrayBlockingQueue和LinkedBlockingQueue一般为常用的阻塞队列。
阻塞队列的使用
接下来通过一个Demo演示阻塞队列的用法。
public class MainActivity extends AppCompatActivity {
private static final String TAG = MainActivity.class.getSimpleName();
private ArrayBlockingQueue<String> mBlockingQueue = new ArrayBlockingQueue<>(10);
private Producer mProducer;
private Consumer mConsumer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mProducer = new Producer();
mProducer.start();
mConsumer = new Consumer();
mConsumer.start();
}
@Override
protected void onStop() {
super.onStop();
mProducer.stopProduct();
mConsumer.stopConsume();
}
/**
* 生产者
*/
private class Producer extends Thread {
private volatile boolean isStop;
private int event;
public void stopProduct() {
isStop = true;
}
@Override
public void run() {
super.run();
while (!isStop) {
try {
// 事件 - 5 发送完成后,睡2秒
if (event == 5) {
Thread.sleep(2000);
}
// 事件 - 10 发送完成后,调用stopProduct()
if (event == 10) {
stopProduct();
}
event++;
// 队列中没有空余位置,生产者端的线程进入阻塞状态,直到消费者端的线程从队列中拿元素,唤醒生产者端的线程继续执行
mBlockingQueue.put("事件 - " + event);
Log.d(TAG, "生产者 生产事件 = " + event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
private class Consumer extends Thread {
private volatile boolean isStop;
public void stopConsume() {
isStop = true;
}
@Override
public void run() {
super.run();
while (!isStop) {
try {
// 拿不到元素消费者端的线程进入阻塞状态,直到生产者端的线程往队列中放入元素,唤醒消费者端的线程继续执行
String event = mBlockingQueue.take();
Log.d(TAG, "消费者 消费事件 = " + event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
这里维护了一个ArrayBlockingQueue,并指定其大小为10,创建了一个生产者线程和一个消费者线程,生产者线程在生产5个事件后睡两秒钟,消费者线程在消费完“事件 - 5”后由于从队列中拿不到元素,就会自动阻塞,等待生产者往队列中放入元素,只要队列中有生产者放入元素,就会立即唤醒消费者线程继续获取元素,详见以下Log:
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 1
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 2
2019-09-02 21:23:58.822 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 1
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 2
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 3
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 3
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 4
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 4
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 5
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 5
生产者线程睡2秒,继续生产事件
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 6
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 6
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 8
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 8
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 10
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生产者 生产事件 = 11
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 10
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消费者 消费事件 = 事件 - 11
阻塞队列的原理
下面通过分析ArrayBlockingQueue的原理加深对阻塞队列的理解。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存放元素的数组 */
final Object[] items;
/** 队首元素下标 */
int takeIndex;
/** 队尾元素下标 */
int putIndex;
/** 当前队列中存放的元素总数 */
int count;
/** 重入锁 */
final ReentrantLock lock;
/** 等待获取元素的条件 */
private final Condition notEmpty;
/** 等待放入元素的条件 */
private final Condition notFull;
/** 构造函数,参数capacity为该队列的容量 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/** 构造函数,参数capacity为该队列的容量,参数fair为重入锁是公平锁还是非公平锁 */
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/** 构造函数,构造函数,参数capacity为该队列的容量,参数fair为重入锁是公平锁还是非公平锁,参数c为初始队列的元素集合 */
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 得到锁
lock.lock(); // Lock only for visibility, not mutual exclusion
// 遍历元素集合,初始化元素数组
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 放入元素
* 如果元素数组没有空余位置,不会阻塞消费者端的线程,直接返回false
* 如果元素数组还有空余位置,调用enqueue()函数,并且返回true
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 得到锁
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 放入元素
* 如果元素数组没有空余位置,调用notFull.awaitNanos(nanos),会使生产者端的线程进入阻塞状态等待一段时间,
* 当等待超时后,如果元素数组依然没有空余位置,直接返回false
* 如果元素数组还有空余位置,调用enqueue()函数,并且返回true
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 得到可中断的锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 放入元素
* 如果元素数组没有空余位置,调用notFull.await()使生产者端的线程进入阻塞状态,
* 直到有消费者从队列中获取元素并且会唤醒生产者端进入阻塞状态的线程继续执行
* 如果元素数组还有空余位置,调用enqueue()函数
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 得到可中断的锁
lock.lockInterruptibly();
try {
while (count == items.length)
// 使生产者端的线程进入阻塞状态
notFull.await();
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 放入元素(核心函数)
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒消费端因队列没有元素获取而进入阻塞状态的线程继续执行
notEmpty.signal();
}
/**
* 获取元素
* 如果元素数组中没有元素,则直接返回null,否则调用dequeue()返回队首元素
*/
public E poll() {
final ReentrantLock lock = this.lock;
// 得到锁
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 获取元素
* 如果元素数组中没有元素,则调用notEmpty.awaitNanos(nanos)使消费者端线程进入阻塞状态,
* 直到有生产者往队列中放入元素并且会唤醒消费者端进入阻塞状态的线程继续执行
* 如果元素数组中还有元素,调用dequeue()函数返回队首元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 得到可中断的锁
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 获取元素
* 如果元素数组没有元素,调用notEmpty.await()使消费者端的线程进入阻塞状态,
* 直到有生产者往队列中放入元素并且会唤醒消费者端进入阻塞状态的线程继续执行
* 如果元素数组还有元素,调用enqueue()函数返回队首元素
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 得到可中断的锁
lock.lockInterruptibly();
try {
while (count == 0)
// 使消费者端的线程进入阻塞状态
notEmpty.await();
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 获取元素(核心函数)
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒生产者端因队列元素填满而进入阻塞状态的线程继续执行
notFull.signal();
return x;
}
}
总结
在生产者消费模型中,生产数据和消费数据的速率不一致,如果生产数据速度快一些,消费不过来,就会导致数据丢失,这时候我们就可以使用阻塞队列来解决这个问题。
阻塞队列是一个队列,我们使用单线程生产数据,使用多线程消费数据。由于阻塞队列的特点:队列为空的时候消费者端阻塞,队列满的时候生产者端阻塞。多线程消费数据起到了加速消费的作用,使得生产的数据不会在队列里积压过多,而生产的数据也不会丢失处理。