本篇主要介绍ReentrantLock 中 condition的await/signal方法的实现原理。
使用说明
public void foo() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
reentrantLock.lock();
condition.await();
//....
condition.signal();
reentrantLock.unlock();
}
当前线程在获取到锁后,通过await来让自己进入park阻塞状态、加入等待队列,并释放锁。
signal方法将其他在等待队列中,处于park状态下的线程唤醒,并尝试竞争锁。
源码分析
await() #1
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 第一部分
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await方法的代码比较多,可以拆分成两部分。第一部分如何让当前线程park。第二部分是线程被unpark后的实现。
第一部分:
addConditionWaiter()
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
主要目的是将线程构建成Conditon模式下的Node,加入到队列中。
首先,队列为空,firstWaiter和lastWaiter都为null。当第一个node创建成功后,firstWaiter和lastWaiter都指向这个node。后续再来节点,则让node.next 指向新节点,lastWaiter也指向新节点。如此构建一个带有头尾指针的单向链表。
再看方法里第二行的if判断,因为进入到condition队列的node一定都是condition(-2)状态,如果不是,则说明当前node所属线程已经处理了其他的逻辑。一般是cancel状态。此时要从链表中去掉cancel态的节点。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
unlinkCancelledWaiters
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
总而言之做了一件事,将非condition状态的node从链表中去掉。此时lastWaiter一定是condition状态,赋值给t。
fullyRelease()
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
savedState表示的是重入的次数,可能1次,也可能多次,这里一次性全部释放掉,将全局的state=0,exclusiveOwnerThread=null。并且通过unparkSuccessor获取同步队列中的下一个node。具体过程已经在ReentrantLock源码分析中做了说明。
简而言之就是当前线程释放锁,让同步队列的下一个node开始抢占。
isOnSyncQueue()
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
此时判断node是不是已经在sync队列中,判断的标准是waitStatus、prev和next,以及从tail倒序查找。
这里关于倒序查询有很大一段注释,大意是说单纯判断node.prev是not null,并不能代表在node已经在sync队列中。需要从sync队列中的tail倒序查询,并且说明了node大概率在tail附近,不会有太多性能损耗。
cas在替换prev时可能失败,也就是我下面贴的入队的方法实现。因为prev是volatile的,会直接可见,但是compareAndSetTail可能会失败,从而导致没有成功入队。
如果node并没有在sync队列中,则被park。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
至此第一部分说明完成,当前占有锁的线程被添加到了condition queue中,释放锁被处于park状态。
第二部分:
既然线程已经被park了,就先说明是如何被unpark的。一般来说我们都是配置signal(signalAll)一起使用。先分析下signal().
signal()
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
如果condition队列里有node,则开始唤醒。
doSignal()
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
如果first.nextWaiter是null,则说明condition队列中只有这个node,firstWaiter、lastWaiter、nextWaiter都是null。
如果后续还有节点,将nextWaiter指向firstWaiter,并断开first.nextWaiter。
重点看下transferForSignal。
transferForSignal()
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
顾名思义,这个方法的目的就是将node从condition队列转到sync队列。
转移前的状态如果不是condition,说明是cancel,就不再执行。成功则继续向后执行,此时当前node的waitState=0。
将node节点enq到sync队列中,返回前一个node。
如果前一个node已经被取消,或者在cas成signal的过程中失败(也就是可能在设置过程中cancel),那就通过unpark将当前节点唤醒(相当于被提前唤醒)。
此时,当前线程完成signal方法的调用,如果调用了unpark,则这个线程也被唤醒。两个线程同时在执行。
doSignalAll()
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
这个all表示把condition队列中的所有node全部transfer到sync队列。
至此,signal(signalAll)执行完成,transfer或者unpark condition队列中的node。
await() #2
无论以什么样的方式唤醒,await内的park线程终究还是会被唤醒,继续向后执行。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 第一部分
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
先检查在等待过程中是否中断过,如果是,看中断时机。
在signal信号前被中断返回THROW_IE,已经在sync队列中返回REINTERRUPT。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果收到了中断信号,且当前node还在condition队列中,则入队到sync队列。
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
acquireQueued()
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
中断标记记录后,尝试获取锁,如果没有达到条件,则再次进入park状态。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
再次被唤醒或者抢占到锁后,清理一波cancel的condition队列。根据不同的中断标记向上抛出异常或者返回中断标记。
至此,await() 方法也执行完成。
总结
condition的各种await、signal的处理结合了lock和unlock的状态。内部的很多操作都是需要在获得锁的状态下执行。这也就是为什么await、signal需要写到lock和unlock块中。
这四个方法需要整体看。
重点说明
ReentrantLock 内部分为了两个队列(sync和condition), 两种模式(EXCLUSIVE、SHARED),五种状态(SINGAL, CONDITION, CANCELLED, PROPAGATE, 0)
sync 队列是带有头尾指针的双向链表,节点字段是
private transient volatile Node head;
private transient volatile Node tail;
volatile Node prev;
volatile Node next;
condition队列是带有头尾指针的单链表,节点字段是
private transient Node firstWaiter;
private transient Node lastWaiter;
Node nextWaiter;
lock()方法本质是将未获得锁的node加入到sync队列
unlock方法本质是将sync队列的node依次唤醒执行。
await()方法是将node加入到condition队列中。
signal()方法是将condition队列中的head node(signalAll是全部node)从condition转到sync队列。