Java并发JUC——并发流程控制

什么是并发流程控制

  • 控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间进行合作
  • 让线程之间相互配合,来满足业务需求
  • 比如,让线程A等待线程B执行完毕后在执行等合作策略

常见的控制并发流程工具类

CountDownLatch

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

CountDownLatch的主要方法

  • public CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
  • await():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  • public void countDown():将count值减1,直到为0时,等待的线程会被唤醒

图解await和countDown方法

CountDownLatch的用法

  • CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
/**
 * @Description: 工厂中,质检,5个工人检查,所有人都认为通过,才通过
 */
public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(new Random().nextInt(10000));
                        System.out.println("NO." + no + "完成了检查");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            };
            executorService.submit(runnable);
        }

        System.out.println("等待5个人检查完......");

        countDownLatch.await();
        System.out.println("所有人都完成了工作,等待进入下一环节");
    }
}
  • CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
/**
 * @Description: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开跑
 */
public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + ",准备完毕,等待发令枪");
                    try {
                        countDownLatch.await();
                        System.out.println("No." + no + ",开始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executorService.submit(runnable);
        }

        //裁判员检查发令枪......
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始......");
        countDownLatch.countDown();
    }
}

CountDownLatch两种用法结合使用

/**
 * @Description: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开跑
 *                  当所有人都到终点后,比赛结束
 */
public class CountDownLatchDemo3 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + ",准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + ",开始跑步");
                        Thread.sleep(new Random().nextInt(10000));
                        System.out.println("No." + no + ",跑到终点了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        end.countDown();
                    }
                }
            };
            executorService.submit(runnable);
        }

        //裁判员检查发令枪......
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始......");
        begin.countDown();

        //等待5个线程都执行完毕之后
        end.await();
        System.out.println("所有人到达终点,比赛结束");
    }
}

CountDownLatch注意点

  • 扩展用法:多个线程等待多个线程完成执行后,再同时执行
  • CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例

Semaphore

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

Semaphore主要方法:

  • Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。
  • Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证时,可以分发给之前等了最长时间的线程。
  • void acquire():从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。
  • void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
  • void acquireUninterruptibly():作用是使等待进入acquire()方法的线程,不允许被中断。acquireUninterruptibly()还有重载的方法acquireUninterruptibly(int permits),此方法的作用是在等待permits的情况下不允许被中断,如果成功获得锁,则取得指定permits的个数。
  • boolean tryAcquire():从信号量尝试获取一个许可,如果无可用许可,直接返回false,不会阻塞
  • boolean tryAcquire(int permits): 尝试获取指定数目的许可,如果无可用许可直接返回false
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit): 在指定的时间内尝试从信号量中获取许可,如果在指定的时间内获取成功,返回true,否则返回false
  • void release():释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1, 调用了两次release,最大许可会改变为2
  • void release(int n):释放n个许可。
  • int availablePermits():当前可用的许可数。
/**
 * @Description: 演示Semaphore用法
 */
public class SemaphoreDemo {

    public static Semaphore semaphore = new Semaphore(3,true);

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 100; i++) {
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }

    static class Task implements Runnable{

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"拿到了许可证");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println(Thread.currentThread().getName()+"释放了许可证");
                semaphore.release();
            }
        }
    }
}

Semaphore特殊用法

  • 一次性获取或释放多个许可证
    • 比如TaskA会调用很消耗资源的method1(),而TaskB调用的是不太消耗资源的method2(),假设我们一共有5个许可证,那么我们可以要求TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求合理分配资源

注意点

  • 获取和释放的许可证数量必须一致,否则,比如每次都获取2个但是只是放一个甚至不释放,那么随着时间的推移,到最后许可证不够用,会导致程序卡死。(虽然信号量类并不对释放和获取的数量做规定,但是这是我们的编程规范,否则容易出错)
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true更为合理
  • 并不是必须由获取许可证的线程来释放许可证,事实上,获取和释放许可证对线程并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可
  • 信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才能开始工作,那么线程1acquire(),而线程2完成任务后release(),这样的话相当于轻量级的CountDownLatch

Condition接口(又称条件对象)

在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步方式了。配合Object的wait()、notify()系列方法可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。Object和Condition接口的一些对比。摘自《Java并发编程的艺术》


首先我们需要明白condition对象是依赖于lock对象的,意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)。condition的使用方式非常的简单。但是需要注意在调用方法前获取锁。

condition是对线程进行控制管理的接口,具体实现是AQS的一个内部类ConditionObject,主要功能是控制线程的启/停(这么说并不严格,还要有锁的竞争排队)。

Condition作用

  • 当线程1需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就进入了阻塞状态
  • 然后通常会有另外一个线程,假设是线程2去执行对应的条件,直到这个条件达成的时候,线程2就会执行condition.signal()方法,这时JVM就会从被阻塞的线程里找到那些等待该condition的线程,那么当线程1收到可执行信号的时候,它的线程状态就会变成Runnable可执行状态


signalAll()和signal()区别

  • signalAll()会唤起所有的等待线程
  • 但signal()是公平的,只会唤起那个等待时间最长的线程

Condition基本用法

/**
 * @Description: 演示Condition的基本用法
 */
public class ConditionDemo1 {

    public ReentrantLock lock = new ReentrantLock();

    public Condition condition = lock.newCondition();

    public void method1(){
        lock.lock();
        try{
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足,开始执行后续任务");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void method2(){
        lock.lock();
        try{
            System.out.println("准备工作完成,开始唤醒其他线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        conditionDemo1.method1();
    }
}

Condition实现生产者和消费者模式

/**
 * @Description: 演示Condition实现生产者和消费者模式
 */
public class ConditionDemo2 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);

    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();

        new Thread(producer).start();
        new Thread(consumer).start();
    }

    class Consumer implements Runnable{
        @Override
        public void run() {
            consume();
        }

        public void consume(){
            while(true){
                lock.lock();
                try{
                    while (queue.size() == 0){
                        System.out.println("队列空,等待数据");
                        notEmpty.await();
                    }
                    Integer poll = queue.poll();//走过await()证明队列不为空,取出数据
                    System.out.println("消费者消费数据:"+poll+",队列剩余数据数量:"+queue.size());

                    notFull.signalAll();//获取数据之后,队列肯定有空闲,那么唤醒生产者进行生产
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer implements Runnable{
        @Override
        public void run() {
            produce();
        }

        public void produce(){
            while(true){
                lock.lock();
                try{
                    while (queue.size() == queueSize){
                        System.out.println("队列已满,等待空余");
                        notFull.await();
                    }
                    queue.offer(1);//走过await()证明队列有空闲,开始往队列里生产数据
                    System.out.println("生产者向队列生产一个数据,队列剩余空间:"+(queueSize-queue.size()));

                    notEmpty.signalAll();//向队列生产数据之后,队列不为空,那么唤醒消费者进行消费
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

Condition注意点

  • 实际上,如果说Lock用来替代synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎一样
  • await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁,而sleep()并不释放锁。
  • 调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样

CyclicBarrier循环栅栏

  • CyclicBarrier是java.util.concurrent包下面的一个工具类,字面意思是可循环使用(Cyclic)的屏障(Barrier),通过它可以实现让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续执行。

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程

  • 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

  • CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。

实现原理:在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用await方法时,将拦截的线程数减1,然后判断剩余拦截数是否为初始值parties,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁。

CyclicBarrier构造方法

  • CyclicBarrier(int parties)
    创建一个新的 CyclicBarrier ,当给定数量的线程(线程)等待它时,它将跳闸,并且当屏障跳闸时不执行预定义的动作。
  • CyclicBarrier(int parties, Runnable barrierAction)
    创建一个新的 CyclicBarrier ,当给定数量的线程(线程)等待时,它将跳闸,当屏障跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行。

CyclicBarrier方法

  • int await() 等待所有 parties已经在这个障碍上调用了 await 。
  • int await(long timeout, TimeUnit unit) 等待所有 parties已经在此屏障上调用 await ,或指定的等待时间过去。
  • int getNumberWaiting() 返回目前正在等待障碍的各方的数量。
  • int getParties() 返回旅行这个障碍所需的parties数量。
  • boolean isBroken() 查询这个障碍是否处于破碎状态。
  • void reset() 将屏障重置为初始状态。

CyclicBarrier的用法

/**
 * @Description: 演示CyclicBarrier的使用
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了,大家统一出发");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable{

        private int id;

        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+",id:"+id+"现前往集合地点");
            try {
                Thread.sleep(new Random().nextInt(10000));
                System.out.println(Thread.currentThread().getName()+"到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName()+"出发了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

CyclicBarrier和CountDownLatch的区别

  • 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需要等待数字到0,也就是说,CountDownLatch用于事件,CyclicBarrier是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建实例;而CyclicBarrier可以重复使用
  • CountDownLatch是减计数方式,而CyclicBarrier是加计数方式。
  • CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置。

参考:
https://www.cnblogs.com/Lee_xy_z/p/10470181.html

https://www.cnblogs.com/wxgblogs/p/5422508.html

https://www.cnblogs.com/gemine/p/9039012.html

https://www.cnblogs.com/brokencolor/p/9752941.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342