ArrayBlockingQueue属性与构造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
...
}
ArrayBlockingQueue内部是由Object[]数组实现的。
takeIndex为队列取出位置指针,putIndex为队列插入位置指针。
同步操作依赖于ReentrantLock实现,notEmpty为非空条件,notFull为非满条件。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
可以通过构造方法的参数capacity指定数组长度,参数fair指定ReentrantLock的公平还是非公平模式,即是否允许后来的操作插队与刚唤醒的线程竞争锁。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
第三个参数 Collection<? extends E> c可以将现有集合插入ArrayBlockingQueue,如果集合的长度大于参数capacity,会抛出ArrayIndexOutOfBoundsException。
阻塞插入 put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
与LinkedBlockingQueue不同,ArrayBlockingQueue的插入与取出都是用同一个锁,所以无法在插入的同时取出元素,吞吐量弱于LinkedBlockingQueue。
当队列长度达到上限,触发notFull条件让当前线程挂起等待。当notFull条件满足,执行enqueue方法插入元素:
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
putIndex即元素插入队列的位置,当元素插入后 putIndex自增如果等于队列长度上限,表示putIndex指针已越界,将putIndex置于0,这样就能保证重复利用数组,而不是通过创建新数组来扩容。
然后将count自增1,表示队列元素增加一个,最后通过notEmpty条件唤醒挂起等待消费元素的线程。
阻塞取出 take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
取出操作使用与插入操作相同的ReentrantLock保证同步块的安全,当队列长度为0,触发notEmpty条件让线程挂起等待。当队列非空,执行dequeue方法取出元素:
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
取出元素的原理同插入是相同的,首先从takeIndex标记的位置取出元素,接着将takeIndex自增,如果takeIndex此时等于队列长度上限,就将其置0从头开始。
接着将count自减1,表示队列的元素减少一个。
itrs 变量表示队列的iterator,如果队列元素减少一个通过elementDequeued方法同步更新iterator遍历器保证线程安全。
最后通过notFull条件唤醒一个等待非满条件的线程执行插入。
非阻塞插入
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
与阻塞插入的区别是,当队列元素数量已满,直接返回false。
非阻塞取出
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
与阻塞取出的区别是,当队列没有元素,直接返回null。
不超时阻塞插入
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
与阻塞插入的区别是,等待notFull条件并非永久的,当过了timeout长度的时间后如果队列还是满的,直接返回false。
不超时阻塞取出
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
与阻塞取出的区别是,等待notEmpty条件并非永久的,当过了timeout长度的时间后如果队列还是空的,直接返回null。