Java 中的同步器

  • CountDownLatch
  • CyclicBarrier
  • Semaphore

CountDownLatch

1. CountDownLatch 的使用

private void countDownTest() {
        // 1. 首先我们声明一个CountDownLatch实例,参数为我们需要同步的线程个数
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    Log.i(TAG, "run: Thread  A  run");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 2. 在操作完毕后通知
                    countDownLatch.countDown();
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    Log.i(TAG, "run: Thread  B  run");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 2. 操作完毕后通知
                    countDownLatch.countDown();

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.i(TAG, "run: Thread  B  run   next");
                }
            }
        });

        try {
             // 3. 在需要同步的线程进行等待
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Log.i(TAG, "countDownTest: main---- run");

        executorService.shutdown();
    }

我们看下输出的日志情况

ph_MainActivity: run: Thread  A  run
ph_MainActivity: run: Thread  B  run
ph_MainActivity: countDownTest: main---- run
ph_MainActivity: run: Thread  B  run   next

从使用的方法及结果我们可以看到,CountDownLatch 可以实现join 的功能,但是比join更灵活,可以结合线程池使用;并且可以在线程执行的任何时刻进行同步,不是必须在任务结束时
2. CountDownLaunch 原理解析

CountDownLaunch — UML图.png

从UML图中我们得知其使用的AQS实现的。

  • 构造方法
// count 是线程在通过之前必须被调用的countDown的次数
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

Sync(int count) {
            setState(count);
        }
  • await :当线程调用await 方法后线程会被阻塞,当其他线程调用了相应次数的countdown 方法,计数器的state 的值为0 时;或者其他线程调用了本线程的intrrupt 方法后 会抛出异常放回
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

那不Sync 中没有实现acquireSharedInterruptibly,我们在AQS中看下

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
             // 如果获取失败则进入阻塞队列
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared 在Sync 中有实现

// 如果当前同步器的状态 为0 的话,表示可获得锁,否则进入阻塞队列
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
  • await(long timeout, TimeUnit unit) 与await 方法类似,只不过当超时会返回false 而结束等待
  • countDown:调用该方法后计数器值会递减,递减后如果计数器值为0则唤醒所有因调用await 方法二阻塞的线程。
public void countDown() {
        // 委托Sync 调用AQS方法
        sync.releaseShared(1);
    }
// 共享模式下的释放
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法
            doReleaseShared();
            return true;
        }
        return false;
    }

释放锁主要是在tryReleaseShared 中做的,在Sync 中有实现

// 对 state 进行递减,直到 state 变成 0;
        // state 递减为 0 时,返回 true,其余返回 false
        protected boolean tryReleaseShared(int releases) {
            // 自旋保证 CAS 一定可以成功
            for (;;) {
                int c = getState();
                // state 已经是 0 了,直接返回 false
                if (c == 0)
                    return false;
                // 对 state 进行递减
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

我们可以看到CountDownLaunch 主要使用了AQS实现,主要通过重写 tryAcquireShared 和 tryReleaseShared 方法进行了控制。


CyclicBarrier

1. CyclicBarrier 使用

final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: cyclicBarrier over!");
            }
        });
        
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Log.i(TAG, "run: A  =====1  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: A  =====2  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: A  =====3  ");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Log.i(TAG, "run: B  =====1  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: B  =====2  ");
                    cyclicBarrier.await();
                    Log.i(TAG, "run: B  =====3  ");
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

运行结果:

ph_MainActivity: run: A  =====1  
ph_MainActivity: run: B  =====1  
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: B  =====2  
ph_MainActivity: run: A  =====2  
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: A  =====3  
ph_MainActivity: run: B  =====3  

CyclicBarrier 使多个线程相互等待,假如计数器为n,前n-1个线程都会因为到达屏障而被阻塞,当第n个线程调用await 后,计数器的值为0了,这时候会发通知唤醒前n-1个线程。并且CyclicBarrier 是可以复用的,可以定制突破屏障后的操作
2. CyclicBarrier 实现

CyclicBarrier -- UML图.png

CyclicBarrier是基于独占锁实现的,底层还是基于AQS。

  • parties:用于记录多少个线程调用await 才会冲破屏障的个数,即我们初始化传入的值

  • count:开始为parties的值,当调用一次await 后就-1,当为0时到达屏障调用await的线程结束等待,随后便会恢复为parties 的值用来复用。

  • 初始化方法:只是进行简单的赋值

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • await()方法:当线程调用该方法后会进行阻塞,直到满足一下某个条件才会继续执行:parties 为0,即都到了屏障点;其他线程调用了本线程的interrupt方法
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 获取锁并上锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            //如果屏障被打破则抛出BrokenBarrierException异常,在调用breakBarrier 方法时会被打破
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果线程被interrupt 则打破屏障并抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // count 进行减1操作
            int index = --count;
            // 如果为0,即所有的线程都到达了屏障点
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 如果设置的破除屏障点后需要执行的任务不为空则执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 唤醒所有的线程并重置
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 循环进行等待,直到被唤醒、打破,或者超时?TODO 为什么使用循环???
            for (;;) {
                try {
                    // 如果没有设置超时,则调用await方法直接进行等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
private void nextGeneration() {
        // 唤醒所有的线程
        trip.signalAll();
        // 重置屏障参数
        count = parties;
        generation = new Generation();
    }
  • await(timeout, unit):与await 类似,只不过当超时后会抛出TimeOutException 返回

Semaphore

1. Semaphore 使用方法

final Semaphore semaphore  = new Semaphore(0);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: A===");
                semaphore.release();
            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: B===");
                semaphore.release();

            }
        });
        semaphore.acquire(2);
        Log.i(TAG, "semaphT: 1=======end");

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                Log.i(TAG, "run: C===");
                semaphore.release();

            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.i(TAG, "run: D===");
                semaphore.release();

            }
        });
        semaphore.tryAcquire(2,1000,TimeUnit.MILLISECONDS);
        Log.i(TAG, "semaphT: 2=======end");

输出结果:

ph_MainActivity: run: B===
ph_MainActivity: run: A===
ph_MainActivity: semaphT: 1=======end
ph_MainActivity: run: C===
ph_MainActivity: semaphT: 2=======end
ph_MainActivity: run: D===

Semaphore 和CyclicBarrier 类似可以重复使用


2. SemaphoreUML 图

Semaphore — UML图.png

Semaphore 的源码我们就不再分析了如果感兴趣可以去看一下

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容