并发编程之CountDownLatch原理与应用

点赞再看,养成习惯,搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

前言

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。

控制并发流程的工具类主要有:

作用 说明
Semaphore 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合 线程只有拿到“许可证”后才能继续运行,相比于其它的同步器,更灵活
CyclicBarrier 线程会等待,直到足够多线程达到了事先规定的数目。一旦达到触发条件,就可以进行下一步的动作 适用于线程之间相互等待处理结果的就绪场景
Phaser 和CyclicBarrier类似,但是计数可变 Java7加入的
CountDownLatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的“等待”和“唤醒” 是Object.wait() 的升级版

简介

背景

  • CountDownLatch是在Java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、ConcurrenthashMap和BlockingQueue。
  • 在java.util.cucurrent包下。

概念

  • CountDownLatch是一个同步计数器,他允许一个或者多个线程在另外一组线程执行完成之前一直等待,基于AQS共享模式实现的。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作来。

关于 AQS,可以查看《并发编程之抽象队列同步器AQS应用ReentrantLock》

应用场景

Zookeeper分布式锁,Jmeter模拟高并发等

场景1 让多个线程等待:模拟并发,让并发线程一起执行

为了模拟高并发,让一组线程在指定时刻(秒杀时间)执行抢购,这些线程在准备就绪后,进行等待(CountDownLatch.await()),直到秒杀时刻的到来,然后一拥而上。这也是本地测试接口并发的一个简易实现。

在这个场景中,CountDownLatch充当的是一个发令枪的角色;就像田径赛跑时,运动员会在起跑线做准备动作,等到发令枪一声响,运动员就会奋力奔跑。和上面的秒杀场景类似。

代码实现如下

package com.niuh.tools;

import java.util.concurrent.CountDownLatch;

/**
 * <p>
 * CountDownLatch示例
 * 场景1 让多个线程等待:模拟并发,让并发线程一起执行
 * </p>
 */
public class CountDownLatchRunner1 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    //准备完毕……运动员都阻塞在这,等待号令
                    countDownLatch.await();
                    String parter = "【" + Thread.currentThread().getName() + "】";
                    System.out.println(parter + "开始执行……");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        Thread.sleep(2000);// 裁判准备发令
        countDownLatch.countDown();// 发令枪:执行发令
    }
}

运行结果

【Thread-2】开始执行……
【Thread-4】开始执行……
【Thread-3】开始执行……
【Thread-0】开始执行……
【Thread-1】开始执行……

我们通过CountDownLatch.await(),让多个参与者线程启动后阻塞等待,然后在主线程 调用CountDownLatch.countdown(1) 将计数减为0,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并

很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。

代码实现如下

package com.niuh.tools;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

/**
 * <p>
 * CountDownLatch示例
 * 场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并
 * </p>
 */
public class CountDownLatchRunner2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
                    System.out.println("finish" + index + Thread.currentThread().getName());
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        countDownLatch.await();// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
        System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
    }
}

运行结果

finish3Thread-3
finish0Thread-0
finish1Thread-1
finish4Thread-4
finish2Thread-2
主线程:在所有任务运行完成后,进行结果汇总

在每个线程(任务) 完成的最后一行加上CountDownLatch.countDown(),让计数器-1;当所有线程完成-1,计数器减到0后,主线程往下执行汇总任务。

源码分析

本文基于JDK1.8

CountDownLatch 类图



从图中可以看出CountDownLatch是基于Sync类实现的,而Sync继承AQS,使用的是AQS共享模式。

其内部主要变量和方法如下:


在我们方法中调用 awit()countDown() 的时候,发生了几个关键的调用关系,如下图所示:

其与AQS交互原理如下:


构造函数

CountDownLatch类中只提供了一个构造器,参数count为计数器的大小

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
     this.sync = new Sync(count);
}

这里需要注意,设置state的数量只有在初始化CountDownLatch的时候,如果该state被减成了0,就无法继续使用这个CountDownLatch了,需要重新new一个,这就是这个类不可重用的原因,有另一个类也实现了类似的功能,但是可以重用,就是CyclicBarrier。

内部同步器

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
   //初始化,设置资源个数
    Sync(int count) {
        setState(count);
    }
    //获取共享资源个数
    int getCount() {
        return getState();
    } 
    //尝试获取共享锁,只有当共享资源个数为0的时候,才会返回1,否则为-1
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //释放共享资源,通过CAS每次对state减1
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
        }
    }
}

主要方法

类中有三个方法是最重要的

// 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//和await()方法类似,只不过等待一定的时间后count值还没变为0的化就会继续执行
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//将count值减1
public void countDown() {
    sync.releaseShared(1);
}

await()方法

// 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

进入AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法.

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //等待过程不可中断
    if (Thread.interrupted())
        throw new InterruptedException();
    //这里的tryAcquireShared在AbstractQueuedSynchronizer中没有实现,在上面介绍的Sync中实现的
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在上面介绍Sync类的时候#tryAcquireShared(),当AQS的state = 0的时候才会返回1,否则一直返回-1,如果返回-1,要执行#doAcquireSharedInterruptibly(),进入该方法

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //这里就把主线程加入队列,队列中有两个节点,第一个是虚拟节点,第二个就是主线程节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //总共只有两个节点,主线程前一个就是首节点
            final Node p = node.predecessor();
            if (p == head) {
                //这里又执行到CountDownLatch中Sync类中实现的方法,判断state是否为0
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //如果state不为0,这里会把主线程挂起阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里使用AQS很神奇,在阻塞队列中就只加入了一个主线程,但是呢,只要其他线程没有执行完,那state就不为0,那主线程就在这里阻塞着,那问题了,谁来唤醒这个主线程呢?就是 countDown() 这个方法。

await(long timeout, TimeUnit unit)方法

该方法就是指定等待时间,如果在规定的等待时间中没有完成,就直接返回false,在主线程中可以根据这个状态进行后续的处理。

//和await()方法类似,只不过等待一定的时间后count值还没变为0的化就会继续执行
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown() 方法

//将count值减1
public void countDown() {
    sync.releaseShared(1);
}

进入AbstractQueuedSynchronizer #releaseShared方法

public final boolean releaseShared(int arg) {
    //该方法同样在AbstractQueuedSynchronizer中没有实现,在CountDownLatch中实现
    if (tryReleaseShared(arg)) {
        //唤醒主线程
        doReleaseShared();
        return true;
    }
    return false;
}

在分析Sync类的时候,介绍了tryReleaseShared(),该方法会把AQS的state减1,如果减1操作成功,执行唤醒主线程操作,进入AbstractQueuedSynchronizer#tryReleaseShared()方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //首节点状态为SIGNAL = -1
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
           
                //唤醒主线程,也就是队列中的第二个节点,如果线程没有执行完成,主线程被唤醒之后,发现state依然不为零,会再次阻塞
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
    if (h == head)                   // loop if head changed
            break;
    }
}

总结

CountDownLatch 和 Semaphore 一样都是共享模式下资源问题,这些源码实现AQS的模版方法,然后使用CAS+循环重试实现自己的功能。在RT多个资源调用,或者执行某种操作依赖其他操作完成下可以发挥这个计数器的作用。

CountDownLatch就只在队列中放入一个主线程,然后不停的唤醒,唤醒之后发现state还是不为0,就继续等待。每个子线程执行完都会对state进行减1操作,当所有子线程都执行完了,那state也就为0,这时候主线程被唤醒之后才可以继续执行。而这也正是CountDownLatch不可重用的原因,如果想要重用,需要重新new一个,因为只有在new的时候才可以设置资源的数量。

CountDownLatch与Thread.join

CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。

CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。

而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

CountDownLatch与CyclicBarrier

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再才执行;
  • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的;
  • CountDownLathch是一个计数器,线程完成一个记录一个,计数器递减,只能用一次。如下图:
  • CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递减,提供reset功能,可以多次使用。如下图:

PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容