下面是ArrayBlockingQueue的部分实现源码:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//创建数组
this.items = new Object[capacity];
//创建锁和阻塞条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//添加元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果队列不满就入队
enqueue(e);
} finally {
lock.unlock();
}
}
//入队的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出队的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
可以看到
//创建锁和阻塞条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
我们这里主要使用了Lock机制,通过Condition的'await()' 和'signal()'方法来实现线程的唤醒和阻塞。当我们调用队列的take()方法进行出队操作时,首先判断队列中的个数是否为0,如果等于0,则调用notEmpty.await()方法阻塞当前出队操作的线程,所以出队操作就会在没有任务时一直阻塞,知道我们在put()方法中放入新的任务到队列中,通过notEmpty.signal()方法进行出队阻塞的唤醒,这就是阻塞队列的实现原理,当然我们也可以使用Object 的wait和notify实现同步的,当没有任务时object.wait(),新的任务入队是object.notify()唤醒操作。