JAVA并发(11)—AQS源码Condition阻塞和唤醒

1. Condition queue详解

Node节点

Node节点中,一般含有下面几个属性:

Node节点的属性.png

当前线程会被包装为Node节点后,放入到queue中。完成了线程的入队。

独占锁/共享锁,依赖prevnext指针,将节点保存在sync queue中。独占锁的nextWaiter指向null,而共享锁的nextWaiter指向new Node()于是,便区分开独占节点和共享节点。

condition queue

没创建一个Condition对象就对应一个Condition队列,每一次调用Condition对象的await方法的线程就会被包装成Node节点扔进Condition队列中。

Condition queue队列.png

condition queue是一个单向队列,在链表中使用nextWaiter属性来串联链表。在condition queue中一般使用三个属性。

* thread:被包装为Node节点的线程;
* waitStatus:节点的状态;
* nextWaiter:指向条件队列下一个节点;

而在条件队列中,waitStatus的属性我们只关注CONDITION即可,只要NodewaitStatus不是CONDITION,我们就认为线程不再等待,此时就要在condition queue出队。

sync queue和condition queue的联系

一般情况下,等待锁的sync queue和条件队列condition queue是相互独立的。当我们调用某个条件队列的signal方法时,将节点在condition queue中取出,后续会唤醒该线程,被唤醒的线程会和普通线程一样需要争夺锁,如果没有争夺到,则同样被加到sync queue中。

锁状态

condition queue队列是等待在特定条件下的队列,因为调用了await()方法时,该线程一定持有锁。所以在进入condition queue前线程必然获取锁。而在sync queue中,若是线程获取到锁,那么head Node是一个哑节点,即thread属性为null。

  • 线程调用await()方法后,线程被包装为Node节点放入到condition queue中后,线程会释放锁(包括重入的锁),然后调用park()方法挂起;

  • 线程被signal()方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去获取到锁,因此,该线程会被添加到sync queue中。

2. Condition 源码详解

当我们调用lock.newCondition()方法后,实际上创建的就是ConditionObject对象。该对象核心属性只有两个,分别代表条件队列的队头以及队尾。

public class ConditionObject implements Condition, java.io.Serializable {  
   /** First node of condition queue. */  
   private transient Node firstWaiter;  
   /** Last node of condition queue. */  
   private transient Node lastWaiter;  
} 

2.1 线程的await()—阻塞

首先,在AQS源码分析中说到:所有等待锁的线程都会包装成Node扔到一个同步队列中。即sync queue中,sync queue是一个双向链表。

public final void await() throws InterruptedException {  
    //若线程调用await方法前被中断了,则直接抛出异常
    if (Thread.interrupted())  
        throw new InterruptedException();  
    //将当前线程封装为Node并加入到condition queue中
    Node node = addConditionWaiter();  
    //释放当前线程占有的锁,并唤起线程
    int savedState = fullyRelease(node);  
    int interruptMode = 0;  
    //若线程不在sync queue中,说明刚刚被await,此时并未调用signal方法,直接将线程挂起。
    while (!isOnSyncQueue(node)) {  
        //挂起线程
        LockSupport.park(this);  
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
            break;  
    }  
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
        interruptMode = REINTERRUPT;  
    if (node.nextWaiter != null) // clean up if cancelled  
        unlinkCancelledWaiters();  
    if (interruptMode != 0)  
        reportInterruptAfterWait(interruptMode);  
}  

2.1.1. 将节点加入到Condition queue

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果尾结点不是CONDITION状态,那么遍历整个链表,清除所有的cancel节点。
    if (t != null && t.waitStatus != Node.CONDITION) {
        //移除链表中ws!=Node.CONDITION的节点
        unlinkCancelledWaiters();
        //将新的尾节点赋值给t
        t = lastWaiter;
    }
    //不存在线程安全问题,因为只有持有锁的线程才会调用await方法进行阻塞
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
   /**
    *  Node(Thread thread, int waitStatus) { 
    *    this.waitStatus = waitStatus;
    *    this.thread = thread;
    *  }
    **/
    if (t == null) 
        firstWaiter = node;
    else 
        t.nextWaiter = node;
    //修改尾指针指向
    lastWaiter = node;
    return node;
}
  1. 节点加入condition queue后,其waitStatus值为Node.CONDITION
  2. condition queue的head节点为普通的node节点;
  3. condition queue是一个单链表,节点入队我们只需要修改nextWaiter指针的指向;

如果节点入队时发现尾结点waitStatus属性不是Node.CONDITION;那么需要调用unlinkCancelledWaiters来剔除已经取消等待的线程。

private void unlinkCancelledWaiters() {
    //当前节点
    Node t = firstWaiter;
    //当前节点的前节点
    Node trail = null;
    //首节点不为null的情况
    while (t != null) {
        //从head节点开始遍历
        Node next = t.nextWaiter;
        //如果节点不符合条件,那么移除该节点
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null) 
               //若当前节点的前节点为null,头结点指向firstWaiter.next
                firstWaiter = next;
            else 
                //若当前节点的前节点不为null,prev.next=node.next
                trail.nextWaiter = next;
            if (next == null) 
                //当前节点节点命中缓存,且当前节点.next为null,即尾节点,那么新的尾节点便是前节点
                lastWaiter = trail;
         } 
         else 
          //若未命中,当前节点的前进一位。
           trail = t;
      //无论命中与否,每一次循环,当前节点前进一位。
      t = next;
    }
}

上述源码就是一道链表算法题:移除链表元素
核心思想:
单链表删除某个元素要记录:当前节点(t),当前节点的前驱节点(trail)。
若元素是头结点,即trail==null,那么头节点(firstWaiter)指向firstWaiter.nextWaiter节点。
若元素不是头节点,即trail!=null,trail.nextWaiter=next。
无论命中或非命中,t均要往前移动一位。
若未命中条件时,trail要向前移动一位。
最后若是命中条件,且t.nextWaiter==null,即当前节点为尾节点(lastWaiter),那么主动将lastWaiter指向当前节点的前节点。

2.1.2 释放当前线程占用的锁,唤醒后续线程

 //释放当前线程占用的锁
final int fullyRelease(Node node) {  
    boolean failed = true;  
    try {  
        //获取当前status状态,即锁重入的次数。
        int savedState = getState();  
        if (release(savedState)) {  
            failed = false;  
            return savedState;  
        } else {  
            throw new IllegalMonitorStateException();  
        }  
    } finally {  
        //当前线程不是持有锁的线程,会抛出异常,且此时failed==true,最终会修改Node节点的ws属性。
        if (failed)  
            node.waitStatus = Node.CANCELLED;  
    }  
}  
//释放锁
public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        //若sync queue头节点后有节点,那么唤醒后续节点
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}  
protected final boolean tryRelease(int releases) {  
    //全部释放锁
    int c = getState() - releases;  
    //若发现当前线程不是持有锁的线程,那么会执行finally代码,取消当前线程
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false; 
    //修改status状态以及独占锁记录的线程 
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}  

2.1.3. 判断线程是否在sync queue中

需要注意的是,线程调用await()前,就不在sync queue中。调用await()方法,完成的操作:

  1. 将线程包装为Node节点,并保存到Condition queue中;
  2. 唤醒sync queue节点线程;

若是调用了signal()方法,线程会再次被加入到sync queue中。

所以在await()阻塞线程之前,判断是否被调用signal()方法完成唤醒,即是否再次加入到sync queue中。

//判断当前线程是否在sync queue中
final boolean isOnSyncQueue(Node node) {  
    //若是节点的状态为CONDITION,直接返回失败,即目前线程还在Condition queue中。
    if (node.waitStatus == Node.CONDITION || node.prev == null)  
        return false;  
    if (node.next != null) // If has successor, it must be on queue  
        return true;  
    return findNodeFromTail(node);  
}  

Node节点加入到sync queue中,实际上要完成3步,即也会伴随尾分叉的过程。

* 多个node节点prev指向tail Node;
* 通过CAS来选定一个node节点作为新的tail Node;
* (旧)tail节点的next指针指向(新)tail节点;
* 自旋上面3个步骤,直到所有的node入sync queue成功;

所以通过node的两个指针其实可以判断出是否已经进入到sync queue中。

还有一种情况是尾分叉,即node.prev!=nullnode.next==null。程序会在sync queue的tail节点开始往前遍历,寻找这个节点。若是第一次寻找失败也没关系,因为我们这个判断是在while (!isOnSyncQueue(node))中,最终也会判断出元素是否在sync queue中。

private boolean findNodeFromTail(Node node) {  
    Node t = tail;  
    for (;;) {  
        if (t == node)  
            return true;  
        if (t == null)  
            return false;  
        t = t.prev;  
    }  
}  

2.2 线程正常唤醒

2.1 线程的await()—阻塞中,线程最终被park()方法进行了阻塞。Lock为我们提供了两种正常唤醒的方法signalAll()方法和singal()方法。

2.2.1 singalAll()唤醒

在调用singalAll()方法的线程必须是持有锁的线程。

public final void signalAll() {  
    /**
    *  protected final boolean isHeldExclusively() {  
    *    return getExclusiveOwnerThread() == Thread.currentThread();  
    *  }
    **/
    //唤醒的线程不是持有锁的线程
    if (!isHeldExclusively())  
        throw new IllegalMonitorStateException();  
    Node first = firstWaiter;  
    if (first != null)  
        //唤醒所有的阻塞线程
        doSignalAll(first);  
}  

首先通过lastWaiter = firstWaiter = null;Condition queue中的头尾节点均置为null。然后通过一个do-while循环,将原先条件队列中的节点一个个拿出来first.nextWaiter = null;且置为null。并调用transferForSignal方法将其加入到sync queue中。

private void doSignalAll(Node first) {  
    //头节点和尾节点都赋值为null
    lastWaiter = firstWaiter = null;  
    do {  
        //将节点中下一个元素取出
        Node next = first.nextWaiter;
        //引用指向null  
        first.nextWaiter = null;  
        //唤醒节点中线程
        transferForSignal(first);
        //下一个元素赋值给first节点  
        first = next;  
    } while (first != null);  
}  

transferForSignal中,首先通过CAS将节点的状态由Node.CONDITION改为0,如果修改不成功,则说明该节点已经被CANCEL。我们直接返回,不必唤醒它。

若修改成功,则将线程加入到sync queue尾部,即调用enq(node)方法,返回的是sync queuenode前驱节点。

最后若是设置node前驱节点失败后,理解唤醒node线程。

final boolean transferForSignal(Node node) {  
    //修改节点的状态,如果修改不成功,说明该节点指向CANCAL,直接返回
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))  
        return false;  
    //将线程节点加入到sync queue队尾,返回的是node节点的前驱节点
    Node p = enq(node);  
    int ws = p.waitStatus;  
    //争取将node节点的前驱节点设置为-1,即可以通知node节点
    //但是node前驱节点为取消节点或设置node前驱节点的ws属性失败,那么直接唤醒node节点的线程。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  
        LockSupport.unpark(node.thread);  
    return true;  
}  
//将线程节点加入到sync queue队尾
private Node enq(final Node node) {  
    for (;;) {  
        Node t = tail;  
        if (t == null) { // Must initialize  
            if (compareAndSetHead(new Node()))  
                tail = head;  
        } else {  
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}  

2.2.2 singal()唤醒线程

signal()signalAll()方法不同之点在于,signalAll()方法只会唤醒一个节点。对于AQS的实现来说,就是唤醒Condition queue中第一个没有被CANCEL的节点。

public final void signal() {  
    if (!isHeldExclusively())  
        throw new IllegalMonitorStateException();  
    Node first = firstWaiter;  
    if (first != null)  
        //唤醒节点
        doSignal(first);  
}  
private void doSignal(Node first) {  
    //成功唤醒一个资源或condition queue遍历到尾部时唤醒结束循环
    do {  
      //若是遍历到尾部,那么直接将尾指针设置为null
        if ( (firstWaiter = first.nextWaiter) == null)  
            lastWaiter = null;  
        //释放first.nextWaiter指针。便于GC
        first.nextWaiter = null;  
    } while (!transferForSignal(first) &&  
             (first = firstWaiter) != null);  
}  

我们依旧使用的transferForSignal()方法,但是使用到了他的返回值,只要节点被成功添加到sync queue中,transferForSingal就会返回true。此时while条件就不会被满足,整个方法就结束了,即调用了singal()方法后,只会唤醒一个线程。

2.3 线程被唤醒

我们新捋一捋上面的方法。

await()阻塞线程:

  1. 获取锁的线程调用了await()方法,线程被包装为一个Node,并加入到condition queue中;
  2. 线程会释放掉身上的锁(包括重入锁),此时锁不会被持有;
  3. 若线程没有位于sync queue中(即没有调用singal唤起线程),那么线程将会阻塞。

singal()唤醒线程:

  1. 唤起的线程不是持有锁的线程,直接抛出Monitor异常;
  2. Nodecondition queue取出,并放入到sync queue中;
  3. 等待线程被唤醒

若阻塞的线程被唤醒:

public final void await() throws InterruptedException {  
    if (Thread.interrupted())  
        throw new InterruptedException();  
    Node node = addConditionWaiter();  
    int savedState = fullyRelease(node);  
    int interruptMode = 0;  
    while (!isOnSyncQueue(node)) {  
        LockSupport.park(this);    //此时我们在这。
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
            break;  
    }  
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
        interruptMode = REINTERRUPT;  
    if (node.nextWaiter != null) // clean up if cancelled  
        unlinkCancelledWaiters();  
    if (interruptMode != 0)  
        reportInterruptAfterWait(interruptMode);  
}  

JAVA并发(10)—interrupt唤醒挂起线程中,我们了解到,线程的唤醒一般有两种途径:(1)一种是正常唤醒。(2)另一种是使用interrupt()中断唤醒。

被唤醒的线程实际上不知道它是怎么被唤醒的。

于是调用checkInterruptWhileWaiting()方法去检查等待的状态。

private int checkInterruptWhileWaiting(Node node) {  
    return Thread.interrupted() ?  
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :  
        0;  
}  

通过Thread.interrupted()方法去检查线程的阻塞状态。Thread.interrupted()表示该线程调用了thread.interrupt()方法,即发生了阻塞。那么会调用transferAfterCancelledWait(node)方法来判断中断的时机。

final boolean transferAfterCancelledWait(Node node) {  
    //若此时node节点的waitStatus为CONDITION状态,说明该节点未被唤醒
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {  
        //将该Node加入到sync queue中
        enq(node);  
        //直接返回true
        return true;  
    }  
    while (!isOnSyncQueue(node))  
        Thread.yield();  
    return false;  
}  
  • 若是调用了interrupt()方法唤醒的线程,上述方法会返回true;
  • 若是调用了signal()方法唤醒的线程,上述方法会返回false;
//这个是signal唤醒后,node节点被中断;
private static final int REINTERRUPT =  1;  
//阻塞的线程被中断。
private static final int THROW_IE    = -1;  

2.3.1 中断唤醒 — THROW_IE

调用该方法时,node节点已经加入到sync queue中,此时node节点也会在condition queue中。

public final void await() throws InterruptedException {  
    if (Thread.interrupted())  
        throw new InterruptedException();  
    Node node = addConditionWaiter();  
    int savedState = fullyRelease(node);  
    int interruptMode = 0;  
    while (!isOnSyncQueue(node)) {  
        LockSupport.park(this);   
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
            break;  
    }  
    //此时我们在这。
    //尝试获取一次锁,若节点的前驱节点为head节点,那么会被唤醒争夺锁。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
        interruptMode = REINTERRUPT;  
    //若是node在condition queue中,清除condition中非CONDITION状态的节点。
    if (node.nextWaiter != null) // clean up if cancelled  
        unlinkCancelledWaiters();  
    //因为发送中断,故`interruptMode!=0`最终返回true。
    if (interruptMode != 0)  
         //中断策略,是抛异常还是自我处理
        reportInterruptAfterWait(interruptMode);  
} 

需要注意的是,线程被中断之后,并不会立即抛出异常。而是会先加入到sync queue中,尝试获取到锁。若获取到锁后,才会执行不同的中断策略

private void reportInterruptAfterWait(int interruptMode)  
    throws InterruptedException {  
    if (interruptMode == THROW_IE)  
        throw new InterruptedException();  
    else if (interruptMode == REINTERRUPT)  
        selfInterrupt();  
}  

该方法时中断策略,若线程被park()后,由interrupt()被中断唤醒,那么最后会抛出InterruptedException。若线程被park()后,由signal()被正常唤醒,那么最后只是会恢复用户行为。

2.3.2 正常唤醒 — REINTERRUPT

final boolean transferAfterCancelledWait(Node node) {  
    //调用该方法时,该节点的状态不为CONDITION。
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {  
        enq(node);  
        return true;  
    }  
    //若是线程此时没有在sync queue中
    while (!isOnSyncQueue(node))  
        //线程先让出CPU,等待node节点进入sync queue
        Thread.yield();  
    return false;  
}  

若是A线程调用signal唤醒node节点时,如下列代码,可以看到,是先修改status状态,然后在加入到sync queue中。A线程刚修改了waitStatus状态,还未加入到sync queue

final boolean transferForSignal(Node node) {  
    //修改节点的状态,如果修改不成功,说明该节点指向CANCAL,直接返回
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))  
        return false;  
    //将线程节点加入到sync queue队尾,返回的是node节点的前驱节点
    Node p = enq(node);
   // 。。。 
}

B线程调用interrupt去中断线程。于是线程需要调用Thread.yield()方法进行让步。

当然,线程在sync queue中,若是发生中断,也是同种情况。

即线程发现被中断后,不会抛出异常,而是调用selfInterrupt()方法,再次完成自我中断。

文章参考

https://segmentfault.com/a/1190000016462281

相关阅读

JAVA并发(1)—java对象布局
JAVA并发(2)—PV机制与monitor(管程)机制
JAVA并发(3)—线程运行时发生GC,会回收ThreadLocal弱引用的key吗?
JAVA并发(4)— ThreadLocal源码角度分析是否真正能造成内存溢出!
JAVA并发(5)— 多线程顺序的打印出A,B,C(线程间的协作)
JAVA并发(6)— AQS源码解析(独占锁-加锁过程)
JAVA并发(7)—AQS源码解析(独占锁-解锁过程)
JAVA并发(8)—AQS公平锁为什么会比非公平锁效率低(源码分析)
JAVA并发(9)— 共享锁的获取与释放
JAVA并发(10)—interrupt唤醒挂起线程
JAVA并发(11)—AQS源码Condition阻塞和唤醒
JAVA并发(12)— Lock实现生产者消费者

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343