今天来说一说DelayQueue,DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有一个过期时间,当从队列获取元素时,只有过期元素才会出队列,不允许存放null元素。队列头元素是最快要过期的元素。
DelayQueue内部有一个PriorityQueue优先队列,存入到该队列的元素都实现Delayed接口,由于每个原始都有一个过期时间,所以要实现获取当前元素还剩多少时间就过期的接口,由于元素是存放在PriorityQueue优先队列中的,所以需要实现元素之间相互比较的接口。
DelayQueue内部实现了一个Leader-Follower的模式,用于尽量减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但其他线程(follower线程)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的follower线程被选举为leader线程。当向队列插入一个过期时间比头元素过期时间还短的元素时,leader会被重置为null。
DelayQueue内部主要的成员变量:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
下面对主要函数原理进行讲解。
offer(E e)
插入元素到队列,当元素e为null时会抛出NullPointException异常,否则由于是无界队列,所以一直返回true。插入的元素需要实现Delayed接口。
public boolean offer(E e) {
//(1)尝试获取独占锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)将元素放入PriorityQueue优先队列中,如果e为null则会抛出NullPointException异常。
q.offer(e);
//(3)如果当前插入的元素成为队列中的第一个节点(即插入元素的过期时间比队列中的所有元素的过期时间都小)则将leader设置为null,并调用available.signal()方法激活available条件队列中的一个线程。
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
//(4)释放锁。
lock.unlock();
}
}
take()
从队列中获取并移除延迟时间过期的元素,当队列中没有元素或没有延迟时间过去的元素则会进行阻塞。
public E take() throws InterruptedException {
//(1)尝试获取独占锁 ,调用的是lockInterruptibly 方法,所以会被中断。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//(2)如果队列中没有元素则调用available.await()进行阻塞。
E first = q.peek();
if (first == null)
available.await();
else {
//(3)如果获取队列的第一个元素的延迟时间已到期,则返回该元素并从队列中移除。
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
//(4)这里有个小细节,当要进行阻塞等待的时候将first 变量置为null不引用队列中的元素。
first = null;
//(5)如果当前线程不是leader线程,则调用available.await()方法持续阻塞等待,直到被唤醒。
if (leader != null)
available.await();
else {
//(6)如果当前线程是leader线程,而调用available.awaitNanos(delay)方法,阻塞等待指定的过期时间。
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
//(7)如果代码执行到这,说明当前线程从队列中移除过期元素了,但是队列中还存在元素,则需要调用available.signal() 方法,唤醒available条件队列中阻塞的线程去获取队列中的元素。
available.signal();
//(8)释放锁。
lock.unlock();
}
}
poll()
尝试获取并移除优先级队列中的头元素,如果优先级队列为空或者头元素的延迟时间还没有过期则返回null,否则返回头元素并从优先级队列中移除。
public E poll() {
//(1)尝试获取独占锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)获取优先级队列中的第一个元素。
E first = q.peek();
//(3)如果第一个元素为null(即优先级队列中无元素)或第一个元素的延迟时间没有过期则返回null,否则返回头元素并将头元素从优先级队列中移除。
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
//(4)释放锁。
lock.unlock();
}
}
size()
获取队列中的元素个数,由于在获取元素个数前需要先获取锁,所有size()方法返回的元素个数是精准的。
public int size() {
//(1)尝试获取独占锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)获取优先级队列中的元素个数。
return q.size();
} finally {
//(3)释放锁。
lock.unlock();
}
}
DelayQueue阻塞无界延迟队列,内部使用PriorityQueue优先级队列进行存储元素,根据元素的延迟时间进行排序,使用ReentrantLock独占锁实现线程同步。
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。