AQS:JAVA经典之锁实现算法(二)-Condition

零:序言

使用过ReentrantLock的盆友应该也知道Condition的存在。先讲解下它存在的意义:就是仿照实现Object类的wait signal signallAll等函数功能的。

这里引申一个面试常问到的问题:wait会释放锁,sleep不会。

  • Condition的通常使用场景是这样的:
    生产者消费者模型,假设生产者只有在生产队列为空时才进行生产,则代码类似如下:
Condition emptyCondition = ReentrantLock.newCondition();
Runnable consumer = new Runnable() {
  public void run() {
    if(queue.isEmpty()) {
      emptyCondition.signal();  // emptyObj.notify();
    } else {
      consumer.consume();
    }
  }
}
Runnable provider = new Runnable() {
  public void run() {
    emptyCondition.wait();  // emptyObj.wait();
    providerInstance.produce();
  }
}

所以我们可以知道Condition设计的意义了。下面我们来讲解下其实现原理。

一:实现概况

还记得在AQS:JAVA经典之锁实现算法(一)提到的锁实现的Sync Queue吗?
Condition的实现是类似的原理:
每个AQS里有x(视你newCondition几次)个Condition Queue,它的结点类也是AQS内部类NodeNode里有一个nextWaiter,指向下一个在同一Condition Queue里的Node
结构如下图:

Condition Queue.png

  • 首先明确下是,condition.wait一定是在成功lock的线程里调用才有效,不然不符合逻辑,同时也会抛出IlleagleMornitorException
  • 获取锁的线程处于Sync Queue的队首,当调用condition.wait时,该线程会释放锁(即将AQSstate置为0),同时唤醒后继结点,后继结点在acquire的循环里会成功获取锁,然后将自己所在结点置为队首,然后开始自己线程自己的业务代码。
    这个过程看下图:
    wait状态图_1
wait状态图_2
  • 当waiter_1收到相应conditionsignal后,在Condition Queue中的Node会从Condition Queue中出队,进入Sync Queue队列,开始它的锁竞争的过程。
    过程看下图:
signal状态图_1
signal状态图_2

所以,这里可以看出来,即使是被signal了,被signal的线程也不是直接就开始跑,而是再次进入Sync Queue开始竞争锁而已。这里的这个逻辑,跟Object.wait Object.signal也是完全一样的。

二:代码实现原理

我们先看一段运用到condition的代码案例:
假设生成者在生产队列queue为空时emptyCondition.signal才进行生产操作

ReentrantLock locker = new ReentrantLock();
Condition emptyCondition = locker.newCondition();

Runnable consumer = new Runnable() {
  public void run() {
    locker.lock();
    if (queue.isEmpty()) {
      emptyCondition.signal();
    } else {
      ...
    }
    locker.unlock();
  }
};

Runnable producer = new Runnable() {
  public void run() {
    locker.lock();
    emptyCondition.wait();
    // 开始生产
    ...
    locker.unlock();
  }
}

我们从消费者一步一步走,拟定如下这样一套线程切换逻辑:

  • producer#lock
  • consumer#lock
  • producer#await
  • consumer#signal
  • consumer#unlock
  • producer#unlock

(先从Sync Queue Condition Queue图解讲一遍,然后对应图解,对着代码撸一遍)


  • producer#lock

生产者直接获取锁成功,入队Sync Queue,位队首

producer#lock后queue状态

consumer#lock

消费者竞争锁失败,进入Sync Queue等待获取锁

consumer#lock后queue状态

  • producer#await

生产者进入等待,释放锁,出Sync Queue,进入Condition Queue,等待emptyCondition来唤醒。

producer#wait后Queue状态

  • consumer#signal

消费者唤起生产者,生产者consumernodeCondition Queue转移到Sync Queue开始竞争锁。

consumer#signal后Queue状态

  • consumer.unlock

consumer释放锁后,consumernodeSync Queue出队,释放state,唤醒后继结点provider#nodeprovider抢占到锁。

consumer#unlock后Queue状态

  • provider#unlock

这里就没有啥好说的了。

当然,我为了讲解过程,像在锁被第一次成功获取的时候,逻辑上虽然并不是直接进入Sync Queue我也给讲解成直接进入Sync Queue了,这是为了缩减边边角角的小逻辑,讲清楚主线逻辑。大家看明白主逻辑,然后再自己去撸一遍,就融会贯通了。

三:代码撸一把

  • provider.lock

        final void lock() {
            // 这就直接获取锁成功了,没有else的逻辑了
            if (compareAndSetState(0, 1))
                // 这个方法是AQS类用来设置拥有锁的线程实例
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
  • consumer#lock

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            // consumer.lock就要走这里了,因为上面的compareAndSetState
            // 返回false
            else
                acquire(1);
        }
    protected final boolean compareAndSetState(int expect, int update) {
        // 楼下这个是CAS原理进行值修改,CAS就对比乐观锁来,
        // 这里想要修改this这个对象的state字段,如果state是expect
        // 则修改至update,返回true;否则false。我们知道provider.lock
        // 已经将state 改为非0值了,所以这里肯定失败啦
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  • provider#await

先简单看下Condition类对象结构

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}
...

一个Condition对象就是一条链队,头尾结点在Condition的内部字段指定firstWaiter lastWaiter

await方法

        public final void await() throws InterruptedException {
            // 因为await是响应中断的等待,这里就是检验下,
            // 通常而言,凡是throws InterruptedException的,
            // 开头基本都是这句
            if (Thread.interrupted())
                throw new InterruptedException();
            // 这里是向condition queue中插入一个node,并返回之,
            // 插入了这个node,就代表当前线程在condition queue
            // 中开始等待了
            Node node = addConditionWaiter();
            // 这个是AQS释放锁方法,加个fully,就是用来将多次
            // 获取锁一次性都释放掉,然后将锁获取次数返回,
            // 留着后面signal后成功获取锁的时候,还要加锁同样的
            // 次数。
            // !!!同时注意,这里唤醒了后继结点!后集结点就继续开始
            // 竞争锁,就是在acquire那个自旋方法里,记得吗
            // 不记得去看看文章(一)
            int savedState = fullyRelease(node);
            // 记录当前线程中断的标记
            int interruptMode = 0;
            // 判断当前的node是否已经转移到sync queue里了。
            // 转移了,说明这个node已经开始竞争锁了,不用再等待
            // 唤醒了,没转,继续自旋
            while (!isOnSyncQueue(node)) {
                // 这里把当前线程给挂起了
                LockSupport.park(this);
                // 这里的方法checkxxx就是用来检查waiting自旋期间,线程有没有
                // interrupt掉。因为await方法是响应线程中断的。
                // 若interrupt了,则在checkxxx方法里,会将node转移到
                // sync Queue中,去竞争,不要担心,因为同时
                // 会设置interruptMode,在最后会根据其值抛Interrupted
                // 异常。。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                // 那什么时候就结束上面的自旋呢?一个是当前的线程被
                // signal了,那node就被transfer到sync queue了,while
                // 就不满足了。再一个就是线程中断了,在while循环体里给break掉了
            }
            // 跳出来后,紧接着去竞争锁,知道成功为止。&& 后面这个THROW_IE,标识
            // 要抛出异常,不是的话,就是REINTERRPUT,代表保证线程的中断标记不被
            // 重置即可。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 这儿是在condition queue里有多个waiter的时候才起作用,主要用来将
            // CANCEL的结点从链队中剔除掉
            // 具体大家自己看吧。现在忽略这
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 这儿就是处理interruptMode中断标记字段的逻辑
            // 在reportxxx中,interruptMode为THROW_IE,则抛出
            // 异常,不是,则保证线程的中断field不被重置为“未中断”即可
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  • consumer#signal

consumer在调用emptyCondition.signal的时候,会影响到emptyConditioncondition queue中的等待线程,这里
具体指上面的provider#await方法。

        public final void signal() {
            // 先判断下,lock锁是不是在调用signal方法的当前线程手里
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 取到condition queue里的第一个waiter node,这里也就是
            // consumer,因为它第一个await进入condition queue了
            Node first = firstWaiter;
            // 这里去进行了具体的signal操作,具体会做先把waiter node的waitStatus
            // 从CONDITION状态改为入Sync Queue的正常状态值0
            // 然后修改Sync Queue 的Head Tail等,让其入队成功
            // 最后再从其前驱结点的状态值上确保当前结点能够被唤起即可。
            // 这里是因为这个waitStatus值对后继结点的行为是有影响的,像SIGNAL指
            // 的是在结点释放后,要去唤醒后继结点
            // 
            if (first != null)
                doSignal(first);
        }
  • consumer#unlock

unlock具体调用的 AQSrelease()方法

    public void unlock() {
        sync.release(1);
    }

    // AQS.release
    public final boolean release(int arg) {
        // tryRelease,这里由NonFairSync实现,具体就是通过
        // CAS去修改state值,并判断是否成功释放锁
        if (tryRelease(arg)) {
            // 成功释放了,则在waitStatus 不是初始状态时,去唤醒后继,
            // 这个 != 0 来做判断的原因,就要综合所有情况,
            // 像FailSync NonFairSync \ Exclusive \ Share
            // 等所有情况来看这里的waitSTatus都会处于什么状态。
            // 全撸一遍的话,会发现这里的 != 0能够涵盖以上所有情况。
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • provider#unlock

这里就同理上面了。

总结

总体来看两个 queue的转换还是挺清楚的。只要记住,不管什么情况(中断与否),都是要从condition queue转移到sync queue的。具体大家还是要自己去想一种线程切换场景,去走走看。
行文匆匆, 欢迎指正。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容