1. Condition queue详解
Node节点
在Node
节点中,一般含有下面几个属性:
当前线程会被包装为Node节点后,放入到queue中。完成了线程的入队。
独占锁/共享锁,依赖prev
和next
指针,将节点保存在sync queue
中。独占锁的nextWaiter
指向null,而共享锁的nextWaiter
指向new Node()
于是,便区分开独占节点和共享节点。
condition queue
没创建一个Condition对象就对应一个Condition队列,每一次调用Condition对象的await方法的线程就会被包装成Node节点扔进Condition队列中。
condition queue
是一个单向队列,在链表中使用nextWaiter
属性来串联链表。在condition queue
中一般使用三个属性。
* thread:被包装为Node节点的线程;
* waitStatus:节点的状态;
* nextWaiter:指向条件队列下一个节点;
而在条件队列中,waitStatus
的属性我们只关注CONDITION
即可,只要Node
的waitStatus
不是CONDITION
,我们就认为线程不再等待,此时就要在condition queue
出队。
sync queue和condition queue的联系
一般情况下,等待锁的sync queue
和条件队列condition queue
是相互独立的。当我们调用某个条件队列的signal
方法时,将节点在condition queue
中取出,后续会唤醒该线程,被唤醒的线程会和普通线程一样需要争夺锁,如果没有争夺到,则同样被加到sync queue
中。
锁状态
condition queue
队列是等待在特定条件下的队列,因为调用了await()
方法时,该线程一定持有锁。所以在进入condition queue
前线程必然获取锁。而在sync queue
中,若是线程获取到锁,那么head Node
是一个哑节点,即thread属性为null。
线程调用
await()
方法后,线程被包装为Node节点放入到condition queue
中后,线程会释放锁(包括重入的锁),然后调用park()
方法挂起;线程被
signal()
方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去获取到锁,因此,该线程会被添加到sync queue
中。
2. Condition 源码详解
当我们调用lock.newCondition()
方法后,实际上创建的就是ConditionObject
对象。该对象核心属性只有两个,分别代表条件队列的队头以及队尾。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
2.1 线程的await()—阻塞
首先,在AQS源码分析中说到:所有等待锁的线程都会包装成Node扔到一个同步队列中。即sync queue
中,sync queue
是一个双向链表。
public final void await() throws InterruptedException {
//若线程调用await方法前被中断了,则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装为Node并加入到condition queue中
Node node = addConditionWaiter();
//释放当前线程占有的锁,并唤起线程
int savedState = fullyRelease(node);
int interruptMode = 0;
//若线程不在sync queue中,说明刚刚被await,此时并未调用signal方法,直接将线程挂起。
while (!isOnSyncQueue(node)) {
//挂起线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
2.1.1. 将节点加入到Condition queue
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾结点不是CONDITION状态,那么遍历整个链表,清除所有的cancel节点。
if (t != null && t.waitStatus != Node.CONDITION) {
//移除链表中ws!=Node.CONDITION的节点
unlinkCancelledWaiters();
//将新的尾节点赋值给t
t = lastWaiter;
}
//不存在线程安全问题,因为只有持有锁的线程才会调用await方法进行阻塞
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
* Node(Thread thread, int waitStatus) {
* this.waitStatus = waitStatus;
* this.thread = thread;
* }
**/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
//修改尾指针指向
lastWaiter = node;
return node;
}
- 节点加入
condition queue
后,其waitStatus值为Node.CONDITION
; -
condition queue
的head节点为普通的node节点; -
condition queue
是一个单链表,节点入队我们只需要修改nextWaiter
指针的指向;
如果节点入队时发现尾结点waitStatus
属性不是Node.CONDITION
;那么需要调用unlinkCancelledWaiters
来剔除已经取消等待的线程。
private void unlinkCancelledWaiters() {
//当前节点
Node t = firstWaiter;
//当前节点的前节点
Node trail = null;
//首节点不为null的情况
while (t != null) {
//从head节点开始遍历
Node next = t.nextWaiter;
//如果节点不符合条件,那么移除该节点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
//若当前节点的前节点为null,头结点指向firstWaiter.next
firstWaiter = next;
else
//若当前节点的前节点不为null,prev.next=node.next
trail.nextWaiter = next;
if (next == null)
//当前节点节点命中缓存,且当前节点.next为null,即尾节点,那么新的尾节点便是前节点
lastWaiter = trail;
}
else
//若未命中,当前节点的前进一位。
trail = t;
//无论命中与否,每一次循环,当前节点前进一位。
t = next;
}
}
上述源码就是一道链表算法题:移除链表元素
核心思想:
单链表删除某个元素要记录:当前节点(t),当前节点的前驱节点(trail)。
若元素是头结点,即trail==null,那么头节点(firstWaiter)指向firstWaiter.nextWaiter节点。
若元素不是头节点,即trail!=null,trail.nextWaiter=next。
无论命中或非命中,t均要往前移动一位。
若未命中条件时,trail要向前移动一位。
最后若是命中条件,且t.nextWaiter==null,即当前节点为尾节点(lastWaiter),那么主动将lastWaiter指向当前节点的前节点。
2.1.2 释放当前线程占用的锁,唤醒后续线程
//释放当前线程占用的锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取当前status状态,即锁重入的次数。
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//当前线程不是持有锁的线程,会抛出异常,且此时failed==true,最终会修改Node节点的ws属性。
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//若sync queue头节点后有节点,那么唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//全部释放锁
int c = getState() - releases;
//若发现当前线程不是持有锁的线程,那么会执行finally代码,取消当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//修改status状态以及独占锁记录的线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.1.3. 判断线程是否在sync queue中
需要注意的是,线程调用await()
前,就不在sync queue
中。调用await()
方法,完成的操作:
- 将线程包装为Node节点,并保存到
Condition queue
中; - 唤醒sync queue节点线程;
若是调用了signal()
方法,线程会再次被加入到sync queue中。
所以在await()
阻塞线程之前,判断是否被调用signal()
方法完成唤醒,即是否再次加入到sync queue
中。
//判断当前线程是否在sync queue中
final boolean isOnSyncQueue(Node node) {
//若是节点的状态为CONDITION,直接返回失败,即目前线程还在Condition queue中。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
Node节点加入到sync queue
中,实际上要完成3步,即也会伴随尾分叉的过程。
* 多个node节点prev指向tail Node;
* 通过CAS来选定一个node节点作为新的tail Node;
* (旧)tail节点的next指针指向(新)tail节点;
* 自旋上面3个步骤,直到所有的node入sync queue成功;
所以通过node的两个指针其实可以判断出是否已经进入到sync queue
中。
还有一种情况是尾分叉
,即node.prev!=null
且node.next==null
。程序会在sync queue
的tail节点开始往前遍历,寻找这个节点。若是第一次寻找失败也没关系,因为我们这个判断是在while (!isOnSyncQueue(node))
中,最终也会判断出元素是否在sync queue中。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
2.2 线程正常唤醒
在2.1 线程的await()—阻塞
中,线程最终被park()
方法进行了阻塞。Lock为我们提供了两种正常唤醒的方法signalAll()
方法和singal()
方法。
2.2.1 singalAll()唤醒
在调用singalAll()
方法的线程必须是持有锁的线程。
public final void signalAll() {
/**
* protected final boolean isHeldExclusively() {
* return getExclusiveOwnerThread() == Thread.currentThread();
* }
**/
//唤醒的线程不是持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//唤醒所有的阻塞线程
doSignalAll(first);
}
首先通过lastWaiter = firstWaiter = null;
将Condition queue
中的头尾节点均置为null。然后通过一个do-while循环,将原先条件队列中的节点一个个拿出来first.nextWaiter = null;
且置为null。并调用transferForSignal
方法将其加入到sync queue
中。
private void doSignalAll(Node first) {
//头节点和尾节点都赋值为null
lastWaiter = firstWaiter = null;
do {
//将节点中下一个元素取出
Node next = first.nextWaiter;
//引用指向null
first.nextWaiter = null;
//唤醒节点中线程
transferForSignal(first);
//下一个元素赋值给first节点
first = next;
} while (first != null);
}
在transferForSignal
中,首先通过CAS将节点的状态由Node.CONDITION
改为0,如果修改不成功,则说明该节点已经被CANCEL
。我们直接返回,不必唤醒它。
若修改成功,则将线程加入到sync queue
尾部,即调用enq(node)
方法,返回的是sync queue
中node
前驱节点。
最后若是设置node前驱节点失败后,理解唤醒node线程。
final boolean transferForSignal(Node node) {
//修改节点的状态,如果修改不成功,说明该节点指向CANCAL,直接返回
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将线程节点加入到sync queue队尾,返回的是node节点的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
//争取将node节点的前驱节点设置为-1,即可以通知node节点
//但是node前驱节点为取消节点或设置node前驱节点的ws属性失败,那么直接唤醒node节点的线程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
//将线程节点加入到sync queue队尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.2.2 singal()唤醒线程
signal()
与signalAll()
方法不同之点在于,signalAll()
方法只会唤醒一个节点。对于AQS的实现来说,就是唤醒Condition queue中第一个没有被CANCEL
的节点。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//唤醒节点
doSignal(first);
}
private void doSignal(Node first) {
//成功唤醒一个资源或condition queue遍历到尾部时唤醒结束循环
do {
//若是遍历到尾部,那么直接将尾指针设置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//释放first.nextWaiter指针。便于GC
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
我们依旧使用的transferForSignal()
方法,但是使用到了他的返回值,只要节点被成功添加到sync queue
中,transferForSingal
就会返回true。此时while条件就不会被满足,整个方法就结束了,即调用了singal()
方法后,只会唤醒一个线程。
2.3 线程被唤醒
我们新捋一捋上面的方法。
await()阻塞线程:
- 获取锁的线程调用了
await()
方法,线程被包装为一个Node
,并加入到condition queue
中; - 线程会释放掉身上的锁(包括重入锁),此时锁不会被持有;
- 若线程没有位于
sync queue
中(即没有调用singal唤起线程),那么线程将会阻塞。
singal()唤醒线程:
- 唤起的线程不是持有锁的线程,直接抛出
Monitor
异常; - 将
Node
在condition queue
取出,并放入到sync queue
中; - 等待线程被唤醒
若阻塞的线程被唤醒:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //此时我们在这。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
在JAVA并发(10)—interrupt唤醒挂起线程中,我们了解到,线程的唤醒一般有两种途径:(1)一种是正常唤醒。(2)另一种是使用interrupt()
中断唤醒。
被唤醒的线程实际上不知道它是怎么被唤醒的。
于是调用checkInterruptWhileWaiting()
方法去检查等待的状态。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
通过Thread.interrupted()
方法去检查线程的阻塞状态。Thread.interrupted()
表示该线程调用了thread.interrupt()
方法,即发生了阻塞。那么会调用transferAfterCancelledWait(node)
方法来判断中断的时机。
final boolean transferAfterCancelledWait(Node node) {
//若此时node节点的waitStatus为CONDITION状态,说明该节点未被唤醒
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//将该Node加入到sync queue中
enq(node);
//直接返回true
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
- 若是调用了
interrupt()
方法唤醒的线程,上述方法会返回true; - 若是调用了
signal()
方法唤醒的线程,上述方法会返回false;
//这个是signal唤醒后,node节点被中断;
private static final int REINTERRUPT = 1;
//阻塞的线程被中断。
private static final int THROW_IE = -1;
2.3.1 中断唤醒 — THROW_IE
调用该方法时,node
节点已经加入到sync queue
中,此时node
节点也会在condition queue
中。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此时我们在这。
//尝试获取一次锁,若节点的前驱节点为head节点,那么会被唤醒争夺锁。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//若是node在condition queue中,清除condition中非CONDITION状态的节点。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//因为发送中断,故`interruptMode!=0`最终返回true。
if (interruptMode != 0)
//中断策略,是抛异常还是自我处理
reportInterruptAfterWait(interruptMode);
}
需要注意的是,线程被中断
之后,并不会立即抛出异常。而是会先加入到sync queue
中,尝试获取到锁。若获取到锁后,才会执行不同的中断策略
。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
该方法时中断策略,若线程被park()
后,由interrupt()
被中断唤醒,那么最后会抛出InterruptedException
。若线程被park()
后,由signal()
被正常唤醒,那么最后只是会恢复用户行为。
2.3.2 正常唤醒 — REINTERRUPT
final boolean transferAfterCancelledWait(Node node) {
//调用该方法时,该节点的状态不为CONDITION。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//若是线程此时没有在sync queue中
while (!isOnSyncQueue(node))
//线程先让出CPU,等待node节点进入sync queue
Thread.yield();
return false;
}
若是A线程调用signal
唤醒node节点时,如下列代码,可以看到,是先修改status状态,然后在加入到sync queue
中。A线程刚修改了waitStatus状态,还未加入到sync queue
中
final boolean transferForSignal(Node node) {
//修改节点的状态,如果修改不成功,说明该节点指向CANCAL,直接返回
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将线程节点加入到sync queue队尾,返回的是node节点的前驱节点
Node p = enq(node);
// 。。。
}
B线程调用interrupt
去中断线程。于是线程需要调用Thread.yield()
方法进行让步。
当然,线程在sync queue
中,若是发生中断,也是同种情况。
即线程发现被中断后,不会抛出异常,而是调用selfInterrupt()
方法,再次完成自我中断。
文章参考
https://segmentfault.com/a/1190000016462281
相关阅读
JAVA并发(1)—java对象布局
JAVA并发(2)—PV机制与monitor(管程)机制
JAVA并发(3)—线程运行时发生GC,会回收ThreadLocal弱引用的key吗?
JAVA并发(4)— ThreadLocal源码角度分析是否真正能造成内存溢出!
JAVA并发(5)— 多线程顺序的打印出A,B,C(线程间的协作)
JAVA并发(6)— AQS源码解析(独占锁-加锁过程)
JAVA并发(7)—AQS源码解析(独占锁-解锁过程)
JAVA并发(8)—AQS公平锁为什么会比非公平锁效率低(源码分析)
JAVA并发(9)— 共享锁的获取与释放
JAVA并发(10)—interrupt唤醒挂起线程
JAVA并发(11)—AQS源码Condition阻塞和唤醒
JAVA并发(12)— Lock实现生产者消费者