Java多线程(5)-- 协作之CountDownLatch、CyclicBarrier和Semaphore

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。     

在多线程程序设计中,经常会遇到一个线程等待一个或多个线程的场景,遇到这样的场景应该如何解决?

   如果是一个线程等待一个线程,则可以通过await()和notify()来实现;

   如果是一个线程等待多个线程,则就可以使用CountDownLatch和CyclicBarrier来实现比较好的控制。


CountDownLatch:一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。 

CyclicBarrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。

这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待,而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。   

CountDownLatch是计数器,线程完成一个就记一个,就像报数一样,只不过是递减的。而CyclicBarrier更像一个水闸,线程执行就像水流,在水闸处都会堵住, 等到水满(线程到齐)了,才开始泄流。


1、CountDownLatch

用来控制一个线程等待多个线程。

维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。      

示例:

public class CountDownLatchTest {

       static CountDownLatch latch = new CountDownLatch(2);

       public static void main(String[] args) throws InterruptedException {

                       newThread(new Runnable() {

                           @Override

                           public void run() {

                                              System.out.println(1);

                                              latch.countDown();

                                              System.out.println(2);

                                              latch.countDown();

                            }

                       }).start();

          //等待到上面两个操作结束,latch减少到0时才能通过

          latch.await();

          System.out.println(3);

     }

}

1

2

3


public class CountdownLatchExample {

    public static voidmain(String[] args) throws InterruptedException {

        final inttotalThread = 10;

        CountDownLatch countDownLatch = new CountDownLatch(totalThread);

        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i< totalThread; i++) {

           executorService.execute(() -> {

               System.out.print("run..");

               countDownLatch.countDown();

            });

        }

       countDownLatch.await();

       System.out.println("end");

       executorService.shutdown();

    }

}

run..run..run..run..run..run..run..run..run..run..end


CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,你只需要把这个CountDownLatch的引用传递到线程里。


2、CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {

    if (parties <= 0)throw new IllegalArgumentException();

    this.parties = parties;

    this.count = parties;

    this.barrierCommand =barrierAction;

}


public CyclicBarrier(int parties) {

    this(parties, null);

}

示例代码:

public class CyclicBarrierTest {

           staticCyclicBarrier c = new CyclicBarrier(2);

           public static void main(String[] args) {

                       newThread(new Runnable() {

                       @Override

                       publicvoid run() {

                               try {

                                     c.await();

                               } catch (Exception e) {

                              }

                       System.out.println(1);

                   }

          }).start();


           try {

                      c.await();

            } catch (Exception e) {

              }

              System.out.println(2);

         }

}

输出

1 | 2

2 | 1

或者输出

1 | 1

2 | 2


如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

public class CyclicBarrierTest2 {

           staticCyclicBarrier c = new CyclicBarrier(2, new A());


           public static void main(String[] args) {

                       newThread(new Runnable() {

                              @Override

                                public void run() {

                                              try{

                                                   c.await();

                                              }catch (Exception e) {

                                              }

                                              System.out.println(1);

                             }

                       }).start();


                       try {

                               c.await();

                       }catch (Exception e) {

                      }

                     System.out.println(2);

          }


            static class A implements Runnable {

                       @Override

                       public void run() {

                               System.out.println(3);

                       }

            }

}

输出

1 | 3

2 | 1

3 | 2


比较:

    1)CountDownLatch是把主干线程挂起,在任务线程中进行倒数计数,直到任务线程执行完才唤醒主干线程继续执行;

       CyclicBarrier是把任务线程挂起,直到所有任务线程执行到屏障处再放行继续执行;

    2)CountDownLatch达到屏障放行标准后放行的是主干线程;

       CyclicBarrier达到屏障放行标准后放行的是任务线程,并且还会额外地触发一个达到标准后执行的响应线程;


3、Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。


下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

public void acquire() throws InterruptedException {  }    //获取一个许可

public void acquire(int permits) throws InterruptedException {}    //获取permits个许可

public void release() { }         //释放一个许可

public void release(int permits) { }    //释放permits个许可

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

public boolean tryAcquire() { };   //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire(long timeout, TimeUnit unit) throwsInterruptedException { };  //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控,代码如下:

public class SemaphoreTest {

              private staticfinal int THREAD_COUNT = 30;

              private staticExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

              private staticSemaphore s = new Semaphore(10);


              public static void main(String[] args) {

                         for (int i = 0; i < THREAD_COUNT;i++) {

                                     threadPool.execute(new Runnable() {

                                                 @Override

                                                 public void run() {

                                                            try{

                                                                       s.acquire();                          

                                                                       System.out.println("save data");

                                                                       s.release();

                                                            }

                                                        catch (InterruptedException e) {

                                                         }

                                                  }

                                      });

                        }

                       threadPool.shutdown();

              }

}

在代码中,虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。

public class SemaphoreExample{

    public static void main(String[] args) {

        final int clientCount = 3;

        final int totalRequestCount = 10;

        Semaphoresemaphore= newSemaphore(clientCount);

        ExecutorServiceexecutorService=Executors.newCachedThreadPool();

        for (int i = 0; i < totalRequestCount; i++) {

            executorService.execute(()->{

                try{

                    semaphore.acquire();

                    System.out.print(semaphore.availablePermits() + " ");

                }catch(InterruptedExceptione) {

                    e.printStackTrace();

                }finally{

                    semaphore.release();

                }

            });

        }

        executorService.shutdown();

    }

}

2 1 2 2 2 2 2 1 2 2


下面对上面说的三个辅助类进行一个总结:

1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

   CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;

  而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

  另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容