简介
ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。
ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
探索ArrayBlockingQueue
1. 主要属性
//存储队列元素的数组,是个循环数组
final Object[] items;
//拿数据索引
int takeIndex;
//放数据索引
int putIndex;
//元素个数
int count;
//重入锁
final ReentrantLock lock;
//条件对象
private final Condition notEmpty;
private final Condition notFull;
从属性可以看到,ArrayBlockingQueue的内部采用数组进行存储,采用重入锁(ReentrantLock lock)保证线程安全,利用条件对象Condition实现可阻塞式的出入队列。
2. 初始化
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];//初始化数组长度
lock = new ReentrantLock(fair); //创建重入锁
notEmpty = lock.newCondition(); //由lock创建条件对象
notFull = lock.newCondition();
}
构造函数还是很容易理解的,初始化数组,创建重入锁。其中fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
3.插入
ArrayBlockingQueue的插入方法有add、offer、put.
add()方法,可以从源码看到实际调用的是offer方法。
public boolean add(E e) {
return super.add(e); //调用父类的add()
}
//父类的方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
再看offer方法:作用是将元素e插入到阻塞队列的尾部,如果队满,返回false,即插入失败。否则,插入元素,这里调用的是私有方法insert();
public boolean offer(E e) {
// 创建插入的元素是否为null,是的话抛出NullPointerException异常
checkNotNull(e);
// 加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列满,返回false
if (count == items.length)
return false;
else {
//插入方法
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
接着看insert()源码,在插入元素后唤醒notEmpty的等待线程
private void insert(E x) {
//将x放入数组中
items[putIndex] = x;
//设置下一个放入索引
putIndex = inc(putIndex);
//元素数量+1
++count;
//唤醒notEmpty的等待线程
notEmpty.signal();
}
inc()方法:若i+1的值等于“队列的长度”,即添加元素之后,队列满;则设置“下一个被添加元素的索引”为0。
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
offer()还有另一个重载方法.这个重载方法多了两个参数,实现的是超时退出;如果队列为满,会使得当前线程进入等待状态等待一定的时长,等待期间如果队列不为满了就会被唤醒,然后元素添加成功,如果超过了参数设置的时限队列仍为满,元素就会添加失败,返回false。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //允许线程中断
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);//等待并且设置最大等待时间
}
insert(e);
return true;
} finally {
lock.unlock();
}
}
put()方法:与可延迟的offer()相似,唯一不同的是没有设置超时等待时间,即当队列为空时,线程一直等待下去。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
4.取出
ArrayBlockingQueue有三个取出元素的方法,分别为poll(),take().
poll()方法:可以看到poll()方法实际调用extract()方法。
public E poll() {
// 加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列为空返回null,否则调用extract方法
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
再看extract()方法:方法也很简单,获取取位置上的元素后将其位置的设置为空,并唤醒notFull上的等待线程
private E extract() {
final Object[] items = this.items;
//获取取索引上的元素,强制将元素转换为“泛型E”
E x = this.<E>cast(items[takeIndex]);
//将取索引上的位置设置为null,即删除
items[takeIndex] = null;
//设置下一个取的位置
takeIndex = inc(takeIndex);
//队列长度减一
--count;
// 唤醒notFull上的等待线程。
notFull.signal();
return x;
}
同时,poll()也有一个重载方法poll(long timeout, TimeUnit unit) ,该方法指定等待时间,如果队列无元素,会使得当前线程进入等待状态等待一定的时长,等待期间如果队列数量不为空了就会被唤醒,获取元素成功,如果超过了参数设置的时限队列仍为空,返回false。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//设置线程可中断
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);//指定时间等待
}
return extract();
} finally {
lock.unlock();
}
}
take()方法与等待poll()相似,唯一不同就是没有设置超时时间,即如果队列为空,会无限等待,除非线程被中断。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
5.其他方法
ArrayBlockingQueue还提供其他方法,例如size()获取当前队列长度,该方法与ConcurrentHashMap的size()不同,ArrayBlockingQueue的size()方法不需要进行多次对比判断是否改变,所以性能比ConCurrentHashMap的size()效率高.
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
clear()方法清空队列,remove(object)移除指定元素等,这里不再进行延伸。