CountDownLatch的使用与解析

引言

CountDownLatch是jdk1.5开始concurrent包里提供的,并发编程工具类。

这个类能够使一个线程等待其他线程完成各自的工作后再执行,可用于多线程的并发执行。

例如,应用程序的主线程希望在多个网络请求线程并发执行完后,刷新页面,避免串行请求导致网络请求耗时长。

CountDownLatch的使用

CountDownLatch的主要使用步骤是

1、初始化,指定线程个数,CountDownLatch latch = new CountDownLatch(3);

参数4代表线程的总数

2、每个线程执行后执行latch.countDown();,代表一个线程执行完成,待完成的线程数减1。

3、在线程添加latch.await();,阻塞该线程,等待其他子线程完成。

Demo如下

package com.example.zzh.myapplication;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        // Let us create task that is going to
        // wait for four threads before it starts
        CountDownLatch latch = new CountDownLatch(3);

        long start = System.currentTimeMillis();

        // Let us create four worker
        // threads and start them.
        WorkerThread first = new WorkerThread(1000, latch, "worker-1");
        WorkerThread second = new WorkerThread(2000, latch, "worker-2");
        WorkerThread third = new WorkerThread(3000, latch, "worker-3");

        first.start();
        second.start();
        third.start();

        // The main task waits for four threads
        latch.await();

        // Main thread has started
        System.out.println(Thread.currentThread().getName() + " has finished. Spend Time = " + (System.currentTimeMillis() - start));
    }

    // A class to represent threads for which
    // the main thread waits.
    static class WorkerThread extends Thread {

        private int delay;
        private CountDownLatch latch;

        public WorkerThread(int delay, CountDownLatch latch, String name) {
            super(name);
            this.delay = delay;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                latch.countDown();
                System.out.println(Thread.currentThread().getName() + " finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

运行结果

worker-1 finished
worker-2 finished
worker-3 finished
main has finished. Spend Time = 3006

CountDownLatch的解析

1、什么是AQS(AbstractQueuedSynchronizer)

深入CountDownLatch源码,需要了解AQS(AbstractQueuedSynchronizer),因为CountDownLatch的底层原理是通过AQS(AbstractQueuedSynchronizer)里面的共享锁来实现的。

推荐阅读:【死磕Java并发】—–J.U.C之AQS(一篇就够了)

以下是上述文章的引用:

AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架,JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。

AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQS的使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。AQS提供了独占锁和共享锁必须实现的方法。

共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。对应的是独占锁,是一种悲观锁,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这样就限制了不必要的并发性,因为读操作并不会影响数据的一致性。

在AQS中,共享锁获取锁,节点模式则为Node.SHARED。独占锁获取锁时,设置节点模式为Node.EXCLUSIVE

CountDownLatch使用的是共享锁,继承AQS的方法有:

  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

  • tryReleaseShared(int arg):共享式释放同步状态。

上面Demo的队列同步器模型如下(参考这里

image.png

2、初始化源码解析

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState(); // 获取主存中的state值
            if (c == 0)
                return false; //state已经为0 直接退出
            int nextc = c-1; // 减一 准备cas更新该值
            if (compareAndSetState(c, nextc)) //cas更新status值为nextc
                return nextc == 0; //更新成功 判断是否为0 退出;更新失败则继续for循环,直到线程并发更新成功
        }
    }
}

private final Sync sync;

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

初始化做的工作是创建同步器实例,这个同步器就是上文提到的继承AQS的类,并实现共享锁方法。

3、latch.countDown()解析

public void countDown() {
    sync.releaseShared(1);
}
    
//AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

其中tryReleaseShared是上文实现的方法,主要的工作是CAS更新state值减一,并判断是否为0,如果为0返回true,说明所有线程都执行完成,可以做唤醒的工作doReleaseShared

//AbstractQueuedSynchronizer.java
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

上面的逻辑是:

如果当前节点是SIGNAL意味着,它正在等待一个信号,或者说它在等待被唤醒,因此做两件事,一是重置waitStatus标志位,二是重置成功后,唤醒下一个节点。

如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。意味着需要将状态向后一个节点传播。

这个死循环,退出的路只有一条,那就是h==head,即该线程是头节点,且状态为共享状态。

4、latch.await()解析

await是阻塞当前线程(中断被抛中断异常),等待被唤醒,源码如下

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

上面的逻辑是:

如果线程被中断,则抛出异常。然后判断tryAcquireShared方法的返回值是否小于0,这个方法是第2步初始化实现的,当(getState() == 0)时则返回1,否则返回-1,即当state还没有减少到0时,则执行doAcquireSharedInterruptibly(arg)

//AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// 往同步队列中添加节点
    boolean failed = true;
    try {
        for (;;) { // 一个死循环 跳出循环只有下面两个途径
            final Node p = node.predecessor(); // 当前线程的前一个节点
            if (p == head) {
                int r = tryAcquireShared(arg); //当getState()==0时则返回1,否则返回-1
                if (r >= 0) {
                    setHeadAndPropagate(node, r);// 处理后续节点
                    p.next = null; // help GC
                    failed = false;
                    return;//当getState为0,并且为头节点,则跳出循环
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();// 响应打断 跳出循环
        }
    } finally {
        if (failed)
            cancelAcquire(node); //如果是打断退出的,则移除同步队列节点
    }
}

在同步队列中挂起的线程,它们自旋的形式查看自己是否满足条件醒来(state==0,且为头节点),如果成立(即被唤醒),将调用setHeadAndPropagate这个方法

private void setHeadAndPropagate(Node node, int propagate) {
     Node h = head; // Record old head for check below
     setHead(node);
     if (propagate > 0 || h == null || h.waitStatus < 0) {
         Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

这个方法是将当前节点的下一个节点设置为头节点,且它也调用了doReleaseShared这个方法,在第3步解析latch.countDown中提到,这个方法就是将头节点设置为共享状态的,由此,共享状态传播下去。

扩展内容

1、CountDownLatch的优缺点

优点:

对使用者而言,你只需要传入一个int型变量控制任务数量即可,至于同步队列的出队入队维护,state变量值的维护对使用者都是透明的,使用方便。

缺点:

CountDownLatch设置了state后就不能更改,也不能循环使用。

2、CountDownLatch的超时处理

如果线程等待超过一定时间,可以取消阻塞被唤醒,那么可以通过设置await的参数

//等待超过2s,自动被唤醒
latch.await(2000, TimeUnit.MILLISECONDS);

参考

Java CountDownLatch解析(上)

Java CountDownLatch解析(下)

【死磕Java并发】—–J.U.C之AQS(一篇就够了)

Java并发-独占锁与共享锁

java共享锁实现原理及CountDownLatch解析

CountDownLatch in Java

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,235评论 4 56
  • CountDownLatch 介绍 CountDownLatch是一个同步协助类,允许一个或多个线程等待,直到其他...
    tomas家的小拨浪鼓阅读 3,175评论 0 9
  • 理解多线程的并发锁,可结合多进程的分布式锁(如Zookeeper的互斥锁、读写锁的实现原理),本质是相通的 介绍 ...
    jiangmo阅读 705评论 0 1
  • 双十一最想打折的是什么? 自己的手。
    ifuntouch阅读 119评论 0 0
  • 将那一点童话的玻璃心 小心封存 不让变了模样的自己 去亵渎 于是 美好从不可奢求 变成了 人们心底的柔软
    久又九阅读 373评论 0 2