一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
原理如下
使用一个 ReentrantLock 来获取一个 Condition, 用于存储将需要再屏障点到达之前阻塞的线程。当公共屏障点被触发时,则唤醒队列中所有线程。
CyclicBarrier 构造函数
由代码可以知道,初始化的时候需要指定需要阻塞的线程数量,同时也可以指定一个在唤醒等待队列之前执行的线程。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0)
throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
await方法
调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
await 本身没什么逻辑,仅仅是调用 dowait 方法。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 抢锁
lock.lock();
try {
// 获取 Generation
final Generation g = generation;
// 如果这代损坏了,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 当前线程被中断过,
if (Thread.interrupted()) {
// 将损坏状态设置为true
// 并通知其他阻塞在此栅栏上的线程
breakBarrier();
throw new InterruptedException();
}
// 线程数减 1
int index = --count;
// index = 0 表示公共屏障点被触发
if (index == 0) {
boolean ranAction = false;
try {
// 执行栅栏任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 开始执行
ranAction = true;
// 唤醒之前等待的线程
nextGeneration();
return 0;
} finally {
// 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 没有时间限制,则无期限等待
if (!timed)
trip.await();
// 有时间限制,则等待一定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 当前代没有损坏
if (g == generation && ! g.broken) {
// 让栅栏失效
breakBarrier();
throw ie;
} else {
// 上面条件不满足,说明这个线程不是这代的
// 就不会影响当前这代栅栏的执行,所以,就打个中断标记 Thread.currentThread().interrupt();
}
}
// 当有任何一个线程中断了,就会调用breakBarrier方法
// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
if (g.broken)
throw new BrokenBarrierException();
// g != generation表示正常换代了,返回当前线程所在栅栏的下标
// 如果 g == generation,说明还没有换代,那为什么会醒了?
// 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
// 正是因为这个原因,才需要generation来保证正确。
if (g != generation)
return index;
// 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。
用例
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrierTest test = new CyclicBarrierTest();
test.test();
}
private void test(){
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("都闪开,我要开始装逼了!!");
try {
TimeUnit.SECONDS.sleep(5);
}catch (Exception e){
e.printStackTrace();
}
});
for (int i = 0; i < 5; i++){
Worker worker = new Worker(i, barrier);
Thread t = new Thread(worker);
t.start();
try {
System.out.println(worker.toString());
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.printStackTrace();
}
}
}
private class Worker implements Runnable{
private int idx;
CyclicBarrier barrier;
public Worker(int idx, CyclicBarrier barrier) {
this.idx = idx;
this.barrier = barrier;
}
@Override
public String toString() {
return "Worker [" + idx + "] 准备就绪!!!";
}
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
for (int i = 5; i > 0; i--){
try {
System.out.println(System.currentTimeMillis() + ", 线程=" + idx + ", 批次=" + i);
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}