在并发工具类中,我们简单了解了CyclicBarrier,接下来走读下代码看看其是如何实现的,代码基于JDK1.7。
代码走读
从上图可以看出CyclicBarrier是基于可重入锁的条件队列实现的,parties代表同一代的线程数,count代表同一代等待的线程数,调用await()方法一次,count减1,直到为0。接下来看看dowait()方法(await()方法其实调用dowait()方法)。这个是CyclicBarrier最核心的方法。
栅栏破坏,抛出异常。
线程中断,主要看breakBarrier()方法,这个方法也很重要。
同一代等待的线程数减1.
当同一代等待的线程数为0时,首先执行barrierCommand的run()方法,注意:等待的线程没有执行。
当同一代等待的线程数为0时,调用nextGeneration()方法。
唤醒等待的线程,即CyclicBarrier的线程中断后,等待的线程还是要执行的。
唤醒等待的线程,即首先执行barrierCommand的run()方法,然后等待的线程执行;同时new Generation(),即产生新的一代,这里可以看出复用。
线程添加到循环队列。
出现异常后执行唤醒等待线程继续执行,即:超时后barrierCommand的run()方法不执行,等待线程仍然执行。
总结:CyclicBarrier基于可重入锁的条件队列实现的,可以复用的栅栏。
例子
public class CyclicBarrierDemo {
private static int SIZE = 5;
private static CyclicBarrier cb;
public static void main(String[] args) {
cb = new CyclicBarrier(SIZE, new Runnable() {
public void run() {
System.out.println("CyclicBarrier's parties is: " + cb.getParties());
}
});
// 新建10个任务
for (int i = 0; i < 10; i++)
new InnerThread().start();
}
static class InnerThread extends Thread {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
// 将cb的参与者数量加1
cb.await();
// cb的参与者数量等于5时,才继续往后执行
System.out.println(Thread.currentThread().getName() + " continued.");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//执行结果
Thread-0 wait for CyclicBarrier.
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-5 wait for CyclicBarrier.
Thread-4 continued.
Thread-0 continued.
Thread-3 continued.
Thread-2 continued.
Thread-1 continued.
Thread-6 wait for CyclicBarrier.
Thread-7 wait for CyclicBarrier.
Thread-8 wait for CyclicBarrier.
Thread-9 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-9 continued.
Thread-5 continued.
Thread-7 continued.
Thread-8 continued.
Thread-6 continued.