多线程的学习(五)关于CountDownLatch 的源码学习与场景假设
什么是CountDownLatch
CountDownLatch 来自与java.util.concurrent
加入时间:jdk1.5
定义方式 public class CountDownLatch
那么它是用来做什么的呢,来看一下源代码
A synchronization aid that allows one or more threads to wait until
a set of operations being performed in other threads completes.
-同步协助类,允许一个或者多个线程等待,直到其他的线程执行完成
A {@code CountDownLatch} is initialized with a given count.
The {@link #await await} methods block until the current count reaches zero due to invocations of the {@link #countDown} method, after which all waiting threads are released and any subsequent invocations of {@link #await await} return immediately. This is a one-shot phenomenon
-- the count cannot be reset. If you need a version that resets the count, consider using a {@link CyclicBarrier}.
- CountDownLatch 使用一个给定的 count 计数值初始化,
- await方法会阻塞,直到当前计数值被countDown方法调用达到0
- 在count为0后,所有等待的线程会被释放,并且之后对await方法的调用都会立刻返回。
- 这是一个一次性现象,如果需要能够重置count的版本,应该考虑使用CyclicBarrier(未来学习)
A {@code CountDownLatch} is a versatile synchronization tool and can be used for a number of purposes.
A {@code CountDownLatch} initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking {@link #await await}
wait at the gate until it is opened by a thread invoking {@link
countDown}.
A {@code CountDownLatch} initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
- CountDownLatch是一个通用的同步工具并且他使用于多种目的
- 一个使用count值1 进行初始化的CountDownLatch可以简单的做一个开关门:所有检查执行await方法等待在蒙口,直到某个线程执行完了countDown,打开门、
- 一个使用N值初始化的CountDownLatch能被使用于一个线程的等待,直到N个线程完成某些动作,或者某些动作完成了N次
A useful property of a {@code CountDownLatch} is that it doesn't require that threads calling {@code countDown} wait for the count to reach zero before proceeding,
it simply prevents anythread from proceeding past an {@link #await await} untilall threads could pass.
- CountDownLatch中一个有用的性质是 ,它不要求在调用的countDown方法等待的线程,在继续之前等待count到达0
- 它只是简单的阻挡所有线程通过(await),直到所有线程都可以通过的时候
- 简单的理解就是:可以 在任意时刻调用await,如果count为0,await会立刻返回,否则非0线程会等待到count为0才往下执行
Sample usage: Here is a pair of classes in which a group of worker threads use two countdown latches:
The first is a start signal that prevents any worker from proceeding
until the driver is ready for them to proceed;
The second is a completion signal that allows the driver to wait
- 具体的用法例子:下面有一对类,工作的时候会使用两倒计时锁
- 首先是一个启动信号,它会阻止任何工作者继续,直到司机准备好,允许他们继续前进
- 然后是一个完成信号,它允许司机进行等待。
使用一个CountDownLatch来
class Driver{
void main() throws InterruptedException{
CountDownLatch.startSignal = new CountDownLatch(1);
CountDownLatch.doneSignal = new CountDownLatch(N);
for(int i = 0; i<N;i++){
//创建和允许线程
new Thread((new Worker(startSignal, doneSignal)).start();
//不让它运行
doSomethingElse();
//让所有线程运行
startSignal.countDown();
doSomethingElse();
//让线程等待直到完成
doneSignal.await();
}
}
}
class Worker implements Runnable {
//常量final CountDownLatch类
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
//构造方法
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
//重写run方法
public void run() {
try {
//调用开始型号等待
startSignal.await();
//进行一些工作
doWork();
//唤醒
doneSignal.countDown();
} catch (InterruptedException ex) {
} ;
}
void doWork() { ... }
}
}
Another typical usage would be to divide a problem into N parts,
describe each part with a Runnable that executes that portion and
counts down on the latch, and queue all the Runnables to an Executor.
When all sub-parts are complete, the coordinating thread will be able to pass through await. (When threads must repeatedly count down in this way, instead use a {@link CyclicBarrier}.)
- 这是另外一个的用法例子:把一个问题分为N部分
- 使用Runnable描述每个部分,并且使用倒计时计时。
- 将所有的可执行任务排队给Executor(管理线程的执行器)
- 当所有子部件完成时,协调线程能够通过await的阻塞
- (当线程必须以这种方式倒计时的时候,请使用CyclicBarrier)
代码例子2:
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
//创建和开始线程
for (int i = 0; i < N; ++i)
e.execute(new WorkerRunnable(doneSignal, i));
//等待
doneSignal.await();
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling {@code countDown()}
actions following a successful return from a corresponding{@code await()} in another thread.
- 内存一致性的影响:
- 直到count到达0,
- 一个线程调用countDown()方法之前的动作 happen-before
- 从另一个线程相应的await()方法返回之后的动作
- 例子:threadB.await()、threadA.countDown(),那么threadA执行countDown()之前的动作,对于threadB的await()方法之后的动作都可见(当count为0时,threadB会从await()方法的阻塞中结束而继续往下执行)。
原理
在对源码的查阅中,基本上了解了对CountDownLatch的使用。这里在对使用进行一部分深入
源代码中:
静态内部类Sync 继承 AbstractQueuedSynchronizer
静态Sync对象 sync
构造方法CountDownLatch(int count)
方法: await() , await(long timeout, TimeUnit unit) ,countDown(),getCount(),toString();
构造方法:
public void countDown() {
sync.releaseShared(1);
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
可以发现构造方法都是围绕对象Sync展开的,那么这边深入了解一下Sync。首先是它的父类
AbstractQueuedSynchronizer
这边使用了网上查阅的资料,日后深入学习。
CountDownLatch 使用的是父类的state来实现同步控制,采用的是共享锁模式
this.sync = new Sync(count);
查阅Sync的构造函数
Sync(int count) {
setState(count);
}
private volatile int state;
设置状态变量为count,state使用了volatile修饰,保证可见性
//无参数的构造方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
观察到sync调用了父类的acquireSharedInterruptibly方法
//父类中的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
除去异常相关就是先判断tryAcquireShared(1)
然后看是否执行doAcquireSharedInterruptibly(1);
//子类自己重写了tryAcquireShared()
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//父类中的doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//用于将当前线程相关的节点将入链表尾部
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
//循环
//调用节点类 获得它的前面一个节点
final Node p = node.predecessor();
//如果节点是刚开始
if (p == head) {
//因为前面有个判断,所以一般应该这里r=-1但是在后续循环中其他线程会修改
int r = tryAcquireShared(arg);
if (r >= 0) {
//进来的话r>0,那么就是state=0 需要退出了
setHeadAndPropagate(node, r);//重点方法
p.next = null; // help GC
return;
}
}
//获取的节点的状态 true if thread should block.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
//取消该节点。
cancelAcquire(node);
throw t;
}
}
tryAcquireShared(int acquires),判断状态参数state是否为0,如果是是0复制为1否则为-1
state就是我们使用的count,在一开始说了 它由volatile修饰 保证可见性。那么这里的意思就是:如果count计数值不为0,那么继续进行,否则就不进行下面的操作了。
doAcquireSharedInterruptibly(int arg)中,使用了一个节点类node,通过predecessor判断后返回,节点类也使用了volatile修饰
这些先对addWaiter方法进行一个了解,线程如何变成节点添加进去的
注意方法setHeadAndPropagate(node, r);
private Node addWaiter(Node mode) {
//new一个节点
Node node = new Node(mode);
for (;;) {
//获取volatile tail节点
Node oldTail = tail;
if (oldTail != null) {
//设置值
node.setPrevRelaxed(oldTail);
//设置成功才会 跳出死循环
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
//修正tail的值 然后重新进去循环
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
addWaiter大概的流程就完了。
接下来回到if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();中
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前一个节点的状态
int ws = pred.waitStatus;
//判断前一个节点能否运行
if (ws == Node.SIGNAL)
return true;
//节点中断了
if (ws > 0) {
do {
//链表的设置
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
//即只有前一个节点状态为Node.SIGNAL才返回真
}
对于节点中的状态标志 主要有:
// 线程已经被取消
static final int CANCELLED = 1;
//线程需要去被唤醒
static final int SIGNAL = -1;
// 线程正在唤醒等待条件
static final int CONDITION = -2;
//线程的共享锁应该被无条件传播
static final int PROPAGATE = -3;大于0时,表明线程已经被取消,不应该被唤醒。初始化的时候链表的头节点的状态值为0
shouldParkAfterFailedAcquire位于死循环內部,函数为true的时候就调用后续的方法阻塞。也就是说,阻塞的地方是在这里。
public void countDown() {
sync.releaseShared(1);
}
//arg=1
public final boolean releaseShared(int arg) {
//tryReleaseShared是另外一个函数
if (tryReleaseShared(arg)) {
//唤醒条件
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//死循环,state减1后为0时才会返回为真 执行后面的唤醒条件
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
即当state为0,执行后续的唤醒条件
private void doReleaseShared() {
//死循环
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//头节点的状态为SIGNAL待唤醒
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head)
//因为头节点在state=0的时候还会进行一次重新设置
break;
}
}
//唤醒时用的类
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒线程
}
- 取该节点的后一个节点进行唤醒,后节点被取消(null)则从后一个往前进行查找。
- 回到线程 阻塞的地方shouldParkAfterFailedAcquire
- 线程先在这里被阻塞,在唤醒后继续执行,此时的state为0,返回值为1.接下来会进入一开始的方法中,因为state为0了,这时候就会调用setHeadAndPropagate(node, r);
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//重新设置头节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
这里再次调用了doReleaseShared();也就是第一次释放锁调用的,继续唤醒后续线程
总结一下机制
使用节点类存放线程,然后根据shouldParkAfterFailedAcquire判断线程的状态,当节点处于需要被唤醒就会调用后续方法使节点进入阻塞状态
当countdown使state为0后会先唤醒第一个被阻塞的线程
然后被阻塞的线程又会进入一开始的循环,进入到setHeadAndPropagate(node, r)方法后重新设置节点然后使用这个方法doReleaseShared()重新进行唤醒
运用场景设想
上面对原理大概的梳理了一边,接下来就是尝试使用一次。
模拟验证游戏的启动环境:
模拟一个应用程序的启动。开始时启动了N个线程,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
class one {
public static void main(String[] args) throws InterruptedException {
CountDownLatch c = new CountDownLatch(10);
//使用循环创建线程检查
for(int i=0;i<10;i++){
Thread t = new Thread(()->{
System.out.println("正在检查游戏环境");
c.countDown();
});
t.start();
}
c.await();
System.out.println("环境正常 正在启动");
}
}