不就是CountDownLatch

介绍CountDownLatch之前,我相信很多人在学习的时候是不清楚这个CountDownLatch的使用场景是啥。为了回答这个问题,简单说个小段子。
老李家有两个熊孩子小A和小B,小A和小B每天放学后自己回家,到家后都需要老李来开门,不要问我为啥不给小A和小B一把钥匙。由于不是一个年级的,放学的时间不同,每天都需要老李开两次门,有一天老李怒了,告诉两个熊孩子,以后到家了必须敲下门,在门口喊一声,老李听到两个孩子的敲门声再去敲门,不要问我小A和小B是亲生的不。
其实,上面这个例子就是CountDownLatch的使用场景,小A和小B到家时间不同相当于两个线程的执行时间不同,小A和小B每次回家必须喊一次相当于线程间的通信,老李只有听到两个孩子的敲门声才会去敲门相当于主线程不再阻塞,向下进行。

再举个最近项目中的使用场景。
最近在做图像识别的一个项目,需要上传图片到华为云的modelart服务来获取图片的识别信息,然后对返回信息进行处理,分析出想要的信息。

由于有些产品是需要同时上传两张图片,然后再根据返回的信息进行处理。上传一张图片等待返回信息这个过程的时间大概是3-5秒,上传两张图片,需要访问两次华为云modelart服务,如果使用串行方式的话,那么需要花费10s左右的时间,这里就想到了可以使用CountDownLatch,等待这两个上传操作的线程结束拿到返回信息后,再调用后面的接口来分析这两个图片的信息。

这里,就简单介绍完了CountDownLatch的使用场景,下面简单说下CountDownLatch的使用,直接给出CountDownLatch源码中的例子。

* class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *     this.doneSignal = doneSignal;
 *     this.i = i;
 *   }
 *   public void run() {
 *     try {
 *       doWork(i);
 *       doneSignal.countDown();
 *     } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }}

首先看main方法,一开始根据需要等待的线程数,初始化CountDownLatch,然后启动线程,线程结束后调用CountDownLatch的countDown方法,当调用countDownLatch的counDown次数和初始化CountDownLatch的线程数相同时,主线程中的CountDownLatch的await方法不再阻塞,往下进行。

使用很简单,主要看源码实现。
CountDownLatch的底层实现是使用AQS队列实现,对AQS的不熟悉的同学可以看下方腾飞的《java并发编程的艺术》这本书或者看下这个AQS
首先看下await方法。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

sync这个实例是什么类型的呢


public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

}

Sync类继承了AbstractQueuedSynchronizer(AQS), 通过state值的大小来控制锁的获取。下面根据CountDownLatch的使用来分析下源码。
(1)创建CountDownLatch实例时。

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

这里就可以很清楚的看到,这里会初始化AQS队列的state值的大小,state值其实就是需要等待线程数的大小。

(2)主线程调用CountDownLatch的await方法,阻塞主线程,等待其他线程执行结束。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

首先调用tryAcquireShared获取当前state的值,如果值为0返回1,说明其他线程执行结束,不再阻塞。如果值不为0,则返回-1,说明其他线程还未执行结束,需要调用doAcquireSharedInterruptibly方法阻塞等待。
下面看下这个方法的实现。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);  ##队列中插入node节点,保存线程信息
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();    ##获取node节点的前一个节点
                if (p == head) {      ## 判断p节点是否是头结点
                    int r = tryAcquireShared(arg);  ##获取state值得大小
                    if (r >= 0) {                      ## r>=0 说明state值为0
                        setHeadAndPropagate(node, r);  ##设置头结点并且触发队列中头结点的下一个节点是否是共享节点,如果是的话,下个节点对应的线程也不再阻塞,具有传播特性。
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  ## 阻塞调用此方法的线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面的注释已经说明上面方法中整个的处理过程,其中setHeadAndPropagate和shouldParkAfterFailedAcquire还需要详细分析一下,首先看下setHeadAndPropagate方法。

  private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

执行此方法的前提是node的前一个节点是head节点,并且state值为0。在这个方法里,首先将当前的node节点设置为head节点,然后根据propagate这个值的大小,判断是否获取node节点的下一个节点,然后根据下一个节点是否是共享式类型的节点,来释放下个节点对应的线程,使下个节点的线程也不再阻塞,propagate使线程的释放具有了传播性,从队列的头结点开始,只要头结点不再阻塞,也可以使队列中的其他共享节点也不再阻塞,具有了传播性。
然后看下shouldParkAfterFailedAcquire方法的实现。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

这个方法的目的主要是获取state值不为0时,是否阻塞此线程。如果此方法返回true则会调用parkAndCheckInterrupt这个方法,在这个方法里调用LockSupport的park方法阻塞此线程。那么阻塞后,什么时候唤醒这个线程呢,想要解决这个疑问就需要看下CountDownLatch的countDown方法的处理逻辑了。
(3) 线程执行完,调用CountDownLatch的countDown方法。

    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

首先,在tryReleaseShared方法中将state值的大小减一,然后执行doReleaseShared方法,

    private void doReleaseShared() {
      
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在doReleaseShared方法中通过unparkSuccessor获取head节点的下一个节点的thread信息,然后执行LockSupport的unpark方法,这样的话之前await方法中阻塞的线程就不再阻塞,继续往下执行。

通过研究CountDownLatch的这三个方法,基本理解了底层实现,另外,如果能看懂这几个方法的源码,其实对AQS的源码也已经了解的差不多了,后面可以去看下Lock的源码,也是基于AQS实现的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容