Java 多线程(五)AQS的使用

ReentrantLockSemaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lockacquire时成功返回),也可以等待(在调用lockacquire时阻塞),还可以取消(在调用tryLocktryAcquire时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLockSemaphore是基于AQS构建的,还包括CountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask

AQS 中的获取与释放

在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的锁获取和释放操作。并且获取操作是一种依赖状态的操作,并且通常会阻塞。
如下伪代码给出了 AQS 获取与释放的简单逻辑。 (Douge Lea 老爷子源码写的太精妙,得慢慢品)

    boolean acquire() throws InterruptedException{
        while (当前状态不允许获取操作){
            if (需要阻塞获取请求){
                如果当前线程不再队列中,则将其插入队列
                阻塞当前线程
            }else {
                返回失败
            }
        }
        更新同步器的状态
        如果线程位于队列中,则将其移除队列
        返回成功
    }
    void release(){
        更新同步器的状态
        if(新的状态允许某个阻塞的线程获取成功){
            解除队列中一个或多个线程的阻塞状态
        }
    }

一个获取操作包括两部分:

  • 首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是根据同步器的语义决定的。例如:对于锁来说,如果它没有被某个线程持有,那么就能成功的获取;而对于闭锁来说,如果它处于结束状态,那么也能被成功的获取。
  • 其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否获取该同步器照成影响。例如,当获取一个锁后,锁的状态将『未备持有』变成『已被持有』,而从Semaphore中获取许可后,将把许可证的数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它。

根据同步器性质的不同,实现的方法各有差异:

  • 独占操作(例如:ReentrantLock):如果某个同步器支持独占的获取操作,那么需要实现 AQStryAcquiretryReleasetryHeldExeclusively等方法。
  • 非独占操作(例如:Semphore,CountDownLatch):对于支持共享获取的同步器,则应该实现tryAcquireSharedtryReleaseShared等方法

AQS 中的的acquireacquireSharedreleasereleaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能够执行。

一个简单的闭锁

OneShotLatch包含两个公有方法:awaitsignal,分别对应获取和释放操作。起初,闭锁是关闭的,任何调用 await 的线程都将阻塞并直到闭锁打开。当通过调用 signal 打开闭锁时,所有等待中的线程豆浆被释放,并且随后到达闭锁的线程也允许被执行。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            int state = getState();
            //如果闭锁是开的(state==1),那么这个操作讲成功,否则失败
            System.out.println("state = " + state);
            return getState() == 1 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);//打开闭锁
            return true;//其他线程可以获取该闭锁
        }
    }

    public static void main(String[] args) {
        OneShotLatch osl = new OneShotLatch();
        new Thread(() -> {
            System.out.println("we are in main 01 thread, and start osl await");
            try {
                osl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("main 01 thread osl await finished");
        }).start();
        new Thread(() -> {
            System.out.println("we are in main 02 thread, and start osl await");
            try {
                osl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("main 02 thread osl await finished");
        }).start();
        new Thread(() -> {

            System.out.println("we are in main 03 thread, and first sleep 5s");
            try {
                Thread.sleep(5000);
                System.out.println("we are in main 03 thread, and start osl await");
                osl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("main 03 thread osl await finished");
        }).start();
        new Thread(() -> {
            System.out.println("we are in work thread,and we start waiting");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("work finish,now we signal main thread");
            osl.signal();
        }).start();
    }
}

java.util.concurrent中的 AQS

并发包中有许多的可阻塞类,例如ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask等,都是基于 AQS 构建的。

ReentrantLock

ReentrantLock只支持独占方式的获取操作,因此它实现了 tryAcquiretryReleaseisHeldExclusively方法。

  • ReentrantLock将同步状态state用于保存锁获取操作的次数。
  • 维护了一个 owner 变量来保存当前线程,但是在1.6上进行了重构增加了AbstractOwnableSynchronizerexclusiveOwnerThread来保存当前线程。只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。
    • tryRelease 中检查 owner 域,从而确保当前线程在执行 unlock 操作前已经获取了锁
    • tryAcquire 中将使用 owner 域判断获取操作是重入还是竞争的

非公平锁版本(默认)

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

当一个线程尝试获取锁时,tryAcquire 将首先检查锁的状态:

  • 未被持有:通过 compareAndSetState(0, acquires)原子性的操作尝试更新锁的状态以表示已经被持有。
  • 已经持有:判断当前现场是否为锁的拥有者,是:计数递增(所以 ReentrantLock 是可重入锁);不是:获取操作失败。

SemaphoreCountDownLatch

SemaphoreCountDownLatch是属于支持共享获取的同步器,因此它们实现了 tryAcquireSharedtryReleaseShared 方法

Semaphorestate 用于保存当前可用许可数量。

Semaphore的非公平锁实现为例:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                //这个代码很精髓啊
                //remaining < 0 :如果没有足够的许可,退出循环
                //compareAndSetState设置成功,退出循环;设置失败,重新尝试
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

tryAcquireShared首先会计算剩余许可的数量。

  • 如果数量不足,那么会返回一个值表示获取操作失败。
  • 如果还有剩余的许可数量,会通过compareAndSetState以原子的方式来降低许可的计数。如果这个操作成功(意味着从上次读取后就没有被修改过),那么就返回一个值表示操作获取成功。

CountDownLatch使用 AQS 的方式与Semaphore很相似:同步状态 state用来保存当前的计数值
await() 调用关系:

graph LR
await-->acquireSharedInterruptibly 
acquireSharedInterruptibly --> tryReleaseShared

await 调用 acquire,当计数器为0时,acquire 将立即返回,否则将执行doAcquireSharedInterruptibly进入阻塞。

    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

countDown()调用关系:

graph LR
countDown-->releaseShared
releaseShared-->tryReleaseShared

countDown() 调用 tryReleaseShared来完成计数递减,当计数值为0时,执行doReleaseShared解除所有等待线程的阻塞。

    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

FutureTask

老爷子新版本并没有直接使用 AQS,通过sun.misc.Unsafe UNSAFE 来操作实现。

ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:读取锁和写入锁,但在基于 AQS 实现的 ReentrantReadWriteLock 中,单个AQS子类将同时管理读取加锁和写入加锁。

ReentrantReadWriteLock使用了两个16位的状态分别表示写入锁和读取锁的计数。在读取锁上的操作使用共享的获取、释放方式;在写入锁上的操作使用独占的获取、释放方式。
AQS 在内部维护一个等待线程队列,其中记录了某个线程是独占(Node.EXCLUSIVE)还是共享(Node.SHARE)访问。

小结

AQS 源码真心复杂,本篇只是粗浅的记录下并发包内的 AQS 的使用情况,下一篇争取啃下 AQS 的实现原理

Doug Lea老爷子一个人撸起了 java 并发的大旗,真滴猛。

以下是是网上比较好的 AQS 源码解析,记录一下

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342