叼逼的,我还以为用了AQS,原来没有用到。。。。叼叼叼!!!!
1.类声明
// 无任何继承
public class CyclicBarrier {}
2. 类属性
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties; // 初始化时传入的总数
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation(); // 当前代栅栏,为什么不直接用broken标记,就是因为要区分当前是哪一代。
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count; // 当前还!!!未!!!到达栅栏的线程数量
3.broken标记类
private static class Generation {
boolean broken = false;
}
为什么要用一个类标识,不直接用boolean,因为CyclicBarrier是可以循环复用的,可以reset一次次的继续使用下去,需要标记broken究竟是broken的当期这一代,还是之前的一代。
4.核心await方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 先锁住
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拿到当前代信息,局部保存住,后续判断用!!!
final Generation g = generation;
// 如果当前代已经被打破了,直接抛异常
if (g.broken)
throw new BrokenBarrierException();
// 获取到中断信号,打破栅栏,并抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 到达栅栏位置,还未到达栅栏的线程数量减一
int index = --count;
// 到达后发现已经没有未到达栅栏的线程了。。。。
if (index == 0) { // tripped
// 之所以有这个哨兵,是因为command.run可能会抛出异常
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行这个初始化的线程(可见是由最后一个到达栅栏的线程执行的)
if (command != null)
command.run();
// command执行成功后ranAction标记为true
ranAction = true;
// 可见一代执行完后,栅栏自动恢复到下一代初始状态
nextGeneration();
// 不用等待了,直接返回
return 0;
} finally {
// 只要command执行成功,这里就不会执行,command执行失败会打破栅栏
// 而且不会自动切换到下一代周期,如果要重新使用,需要调用reset方法。
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 到这里说明还有其他线程未到达栅栏,需要等待
for (;;) {
try {
if (!timed)
trip.await(); // 没有超时则直接条件队列等待,此时释放lock了已经
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); // 注意nanos已经被重新赋值
} catch (InterruptedException ie) {
// 现在还处于当前代,而且还没被打破,收到中断信号当然直接打破栅栏并抛异常了
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 此时说明要么换代了,要么所在这一代已经被打破了,对应到下面的两个if
// if (g.broken) if (g != generation),下面的逻辑会直接停止当前的等待的,所以这里只需要重新标记中断状态即可
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 中途被唤醒获取到锁后发现自己那一代已经被打破了
if (g.broken)
throw new BrokenBarrierException();
// 中途被唤醒获取到锁后发现已经改朝换代了,直接结束等待。
if (g != generation)
return index;
// 中途被唤醒获取到锁后发现已经等待超时了
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
4.打破栅栏和进入下一代
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation(); // 改朝换代了,呵呵哒
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true; // 当前代被打破了,呵呵哒
count = parties;
trip.signalAll();
}