Java线程同步的几种方式

本文主要是介绍java中线程同步的几种常用方式。

CountDownLatch

从字面上理解,CountDownLatch是一个同步等待的锁,根据官方的注释可以看出这其实是一个同步执行工具类。

先看一下官方注释的前两段

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 

翻译一下就是:

 /**
  * CountDownLatch是一个,允许一个或多个线程,
  * 等待其他线程中执行的一组操作完成的,同步辅助工具。
  * 
  * CountDownLatch用给定的计数进行初始化。 
  * 当线程点用await方法后被阻塞,直到当前计数由于其他线程调用countDown()方法而达到零,
  * 此后所有等待线程被放,并且任何后续调用await立即返回。
  * 这是一次性的操作,计数无法重置。
  * 如果您需要重置计数的版本,请考虑使用CyclicBarrier。
  * /

解释的很清楚,不在赘述,接着看一下官方提供的伪代码案例

官方案例一

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

这个案例很直白的说明了,CountDownLatch可以让多个线程同时初始化完成后等待,直到主线程要求他们开始执行为止,并且当主线程调用await()之后阻塞直到所有的线程调用countDown()将计数减为0,主线程再次唤醒执行后序操作。

当然这样还有一些其他的注意点,譬如子线程被中断或者子线程的耗时操作很长导致主线程一直阻塞等问题。

官方案例二

class Driver2 { // ...                                      
  void main() throws InterruptedException {                 
    CountDownLatch doneSignal = new CountDownLatch(N);      
    Executor e = ...                                        
                                                            
    for (int i = 0; i < N; ++i) // create and start threads 
      e.execute(new WorkerRunnable(doneSignal, i));         
                                                            
    doneSignal.await();           // wait for all to finish 
  }                                                         
}                                                           
                                                            
class WorkerRunnable implements Runnable {                  
  private final CountDownLatch doneSignal;                  
  private final int i;                                      
  WorkerRunnable(CountDownLatch doneSignal, int i) {        
    this.doneSignal = doneSignal;                           
    this.i = i;                                             
  }                                                         
  public void run() {                                       
    try {                                                   
      doWork(i);                                            
      doneSignal.countDown();                               
    } catch (InterruptedException ex) {} // return;         
  }                                                         
                                                            
  void doWork() { ... }                                     
  }
}                                                   

这个案例是说,当一个问题需要被分成n份进行处理时,将他们用线程池来执行,并让主线程等待。当然官方注释里还说了,如果需要反复用这种形式来执行一些问题时可以考虑使用CyclicBarrier来代替CountDownLatch,因为CountDownLatch是一次性的计数器无法重置。

CyclicBarrier

字面意思:可循环使用的栅栏。主要的作用也是让指定个数的线程到达目标位置后进入等到状态,等所有的线程都到到目标位置后同时开始执行。

构造方法有2个

  1. CyclicBarrier(int parties),其中parties指等待的线程数目,当await线程数达到parties时,线程同时开始执行。
  2. CyclicBarrier(int parties, Runnable barrierAction),第二个参数指所有线程达到后执行的操作。

通过第二个构造方法也可以实现CountDownLatch功能,当然这不是CyclicBarrier的目的

再来看一下到达目标位置时的等待方法,有2个重载方法

  1. await(),这个没什么可说的,到达指定位置后等待
  2. await(long timeout, TimeUnit unit),这个指到到指定位置后等待一段时间,如果超时则继续执行后序操作。

现在来看2个例子说明一下使用CyclicBarrier可能出现的问题

CyclicBarrier例一

public class CyclicBarrierTest {

    public static void main(String[] args) {

        try {
            final int Num = 5;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(Num);

            for (int i = 0; i < Num - 1; i++) {
                new Thread(new RunnableOne(cyclicBarrier)).start();
            }

            Thread thread = new Thread(new RunnableTwo(cyclicBarrier));
            thread.start();

            Thread.sleep(2000);

            thread.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    private static class RunnableOne implements Runnable {

        CyclicBarrier mCyclicBarrier;

        RunnableOne(CyclicBarrier cyclicBarrier) {
            mCyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("wait in barrier");
                mCyclicBarrier.await();
                System.out.println("finish");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    private static class RunnableTwo implements Runnable {

        CyclicBarrier mCyclicBarrier;

        RunnableTwo(CyclicBarrier cyclicBarrier) {
            mCyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("wait in barrier");
                Thread.sleep(5000);
                mCyclicBarrier.await();
                System.out.println("finish");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

打印结果如下:

wait in barrier
wait in barrier
wait in barrier
wait in barrier
wait in barrier
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableTwo.run(CyclicBarrierTest.java:65)
    at java.lang.Thread.run(Thread.java:748)

这个例子的意图也很简单,启动4个RunnableOne,随后启动1个RunnableTwo,在所有线程都await()之前其中一个线程被中断了,因为没有都await()成功,其他4个线程就一直阻塞。
这就提醒我们,要在抛出异常后及时处理,至少也要让其他线程能正常执行下去。

CyclicBarrier例二

public class CyclicBarrierTest {

    public static void main(String[] args) {

        final int Num = 5;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(Num);

        for (int i = 0; i < Num - 1; i++) {
            new Thread(new RunnableOne(cyclicBarrier)).start();
        }

        Thread thread = new Thread(new RunnableTwo(cyclicBarrier));
        thread.start();
    }


    private static class RunnableOne implements Runnable {

        CyclicBarrier mCyclicBarrier;

        RunnableOne(CyclicBarrier cyclicBarrier) {
            mCyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("wait in barrier");
                Thread.sleep(5000);
                mCyclicBarrier.await();
                System.out.println("finish");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    private static class RunnableTwo implements Runnable {

        CyclicBarrier mCyclicBarrier;

        RunnableTwo(CyclicBarrier cyclicBarrier) {
            mCyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("wait in barrier");

                mCyclicBarrier.await(2000, TimeUnit.MILLISECONDS);

                System.out.println("finish");
            } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

打印如下:

wait in barrier
wait in barrier
wait in barrier
wait in barrier
wait in barrier
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableTwo.run(CyclicBarrierTest.java:61)
    at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
    at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
    at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
    at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
    at java.lang.Thread.run(Thread.java:748)

这里模拟了一个await()超时的异常,可以看到在抛出异常后需要我们自己处理后期的事物。同时某一个线程抛出超时异常后,其他线程再次到达会抛出BrokenBarrierException异常,防止继续等待。

Semaphore

其实Semaphore不该放到这里讲,因为Semaphore类似于Lock的存在,是对资源或者线程的一种控制,但是这篇博文主要讲了线程的等待唤起,信号量放这里讲问题也不大。

官方的说法是信号量通常用来限制线程的数量,而不是控制访问一些(物理或逻辑)资源。用法也非常简单,使用前先acquire()获取许可,在获取许可过程中,是线程是被阻塞的,使用完毕release()许可即可。这点类似于Lock,不同的是Semaphore的acquire()可以被允许多次。

Semaphore有两个构造方法,可以指定Semaphore获取是公平的还是非公平的,默认是非公平

看这里,举个栗子:

public class SemaphoreTest {

    public static void main(String[] args) {
        CountDownLatch startLatch = new CountDownLatch(1);
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 10; i++) {
            new Thread(new MyRunnable(startLatch, semaphore)).start();
        }

        startLatch.countDown();
    }

    private static class MyRunnable implements Runnable {

        final CountDownLatch mCountDownLatch;
        final Semaphore mSemaphore;

        MyRunnable(CountDownLatch countDownLatch, Semaphore semaphore) {
            mCountDownLatch = countDownLatch;
            mSemaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                mCountDownLatch.await();
                mSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " acquire success");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mSemaphore.release();
            }
        }
    }
}

打印如下:

Thread-0 acquire success
Thread-1 acquire success
Thread-9 acquire success
Thread-3 acquire success
Thread-2 acquire success
Thread-4 acquire success
Thread-6 acquire success
Thread-7 acquire success
Thread-5 acquire success
Thread-8 acquire success

可以看出这是默认的非公平锁的情况,再来看一下公平锁的情况

public class SemaphoreTest {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);

        for (int i = 0; i < 10; i++) {
            new Thread(new MyRunnable(semaphore)).start();
        }
    }

    private static class MyRunnable implements Runnable {

        final Semaphore mSemaphore;

        MyRunnable(Semaphore semaphore) {
            mSemaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                mSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " acquire success");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mSemaphore.release();
            }
        }
    }
}

打印如下

Thread-0 acquire success
Thread-1 acquire success
Thread-2 acquire success
Thread-3 acquire success
Thread-4 acquire success
Thread-5 acquire success
Thread-6 acquire success
Thread-7 acquire success
Thread-8 acquire success
Thread-9 acquire success

当然这里肯定有读者想了,直接将Semaphore置为true公平锁的情况就好了,何必去掉CountDownLatch呢。

这里需要注意下,虽然你Semaphore是公平,但是CountDownLatch到点之后唤起线程的顺序是随机的,并不一定就是线程入队的顺序唤起。

线程的join()

jion方法的作用是让主线程阻塞等待子线程完成,当然有几个前提条件,下面细说。

join方法有三个重载的版本

  1. final void join(); //一直等待到join的线程执行完毕
  2. final synchronized void join(long millis); //等待指定时间后继续执行
  3. final synchronized void join(long millis, int nanos); 同上,时间处理了一下

第一个和第三个最后其实调用的都是第二个重载方法,我们来看一下源码

    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

直接看最后的while循环,可以看到,调用这个方法,其实是调用Object提供的wait(long timeout)让主线程阻塞而已。有几个注意点

  1. 子线程如果已经销毁,则直接跳过等待
  2. join(long millis) 是一个同步方位,意味着要想调用此方法需要先获取到子线程的实例对象锁

来看一个例子,验证一下第二点:

public class JoinTest {

    public static void main(String[] args) {

        final Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(6000);
                    System.out.println("4---" + System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        thread.start();

        try {
            System.out.println("1---" + System.currentTimeMillis());
            new Thread(new MyRunnable(thread)).start();
            System.out.println("2---" + System.currentTimeMillis());
            thread.join(2000);
            System.out.println("3---" + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("finish " + System.currentTimeMillis());
    }

    private static class MyRunnable implements Runnable {
        final Object mObject;

        MyRunnable(Object object) {
            mObject = object;
        }


        @Override
        public void run() {

            synchronized (mObject) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

打印如下:

1---1525529803445
2---1525529803446
3---1525529807449
finish 1525529807449
4---1525529809445

可以很清晰的看到,打印完1之后立即打印了2,但是2和3之间打相差了4秒,原因就在join之前需要先获取thread的锁对象,但是需要MyRunnable释放锁之后才能执行。

总结

好了,又到总结的时间了。

  1. CountDownLatch相对于CyclicBarrier侧重点是,等待其他线程操作完成后主线程在继续后续的操作
  2. CyclicBarrier相对于CountDownLatch侧重点是,所有的线程操作完成后等待一起继续后续操作。
  3. CountDownLatch不能重置状态,CyclicBarrier可以重置后多次利用
  4. CountDownLatch和CyclicBarrier抛出异常后都需要妥善处理
  5. Semaphore于Lock类似,主要用于线程的访问控制,构造时可以指定是否是公平竞争
  6. thread.join()主要是让主线程等待子线程执行完毕,有个注意点就是join()执行之前需要获取到子线程的实例对象锁。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,650评论 0 44
  • 一 因为马頔的《南山南》,才开始关注民谣,因为大冰和他的书籍对民谣有了更深的了解,因为校园广播的及时更歌,民谣成为...
    陈小蟹阅读 1,928评论 20 43
  • 做To B产品时,会接到很多利益相关方的需求,如果没有深入调研用户,做出的产品往往差强人意。那么在做产品时,...
    余田阅读 293评论 2 2
  • 清晨,起了个大早。妻子说,今天去游大皇宫,由于排队购买门票的游客较多,所以必须赶早。 上了网约车,可惜天不遂人愿,...
  • 1、早晨在地铁上忽然看到地铁门玻璃映出来的自己,玻璃中是一个专业气质的经理人:干练的短发、相宜的淡妆、文艺的框架眼...
    小颐妈阅读 115评论 0 0