1. 队列同步器介绍
队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,一般作为同步器组件的静态内部类,在同步器中仅定义了与状态相关的方法,且状态既可以独占获取也可以共享获取,这样就可以实现不同的同步组件(ReetrantLock、CountDownLatch等)。同步组件利用同步器进行锁的实现,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等顶层操作。
同步器的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
同步器提供了3个方法来修改和访问同步状态:
1、getState():获取当前同步状态
2、setState(int newState):设置当前同步状态
3、compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能保证操作的原子性。
- 同步器可以重写的方法
方法名称 | 描述 |
---|---|
tryAcquire(int arg) | 独占获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期状态,然后再进行CAS设置同步状态。 |
treRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之失败 |
tryReleaseShared(int arg) | 共享式释放同步状态 |
isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
- 同步器的模板方法基本上分为3类:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
2. 队列同步器示例
- 只有掌握了同步器的工作原理才能更加深入的理解并发包中其他的并发组件,下面通过一个独占锁的示例来深入了解一下同步器的工作原理
- 独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁。
public class Mutex implements Lock, java.io.Serializable {
// 内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() { return new ConditionObject(); }
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
- 可以看到Mutex将Lock接口均代理给了同步器的实现。
- 使用方将Mutex构造出来之后,调用lock获取锁,调用unlock进行解锁。
- 用户使用Mutex时并不会直接和内部同步器的实现打交道,而是通过调用Mutex提供的方法。
3. 队列同步器实现分析(独占方式)
1) 同步队列
- 同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
- 同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
- 同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。
同步队列中节点源码如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
......
private transient volatile Node head;//头节点
private transient volatile Node tail;//尾节点
private volatile int state;//*同步状态*
......
static final class Node {
volatile int waitStatus;//等待状态
volatile Node prev;//前驱
volatile Node next;//后继
volatile Thread thread;//线程引用
......
}
......
}
同步队列基本结构图如下:
同步队列涉及两个动作:
- 节点加入到同步队列中
- 线程加入队列的过程必须保证线程安全,同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),保证线程安全。
- 为什么必须保证线程安全:同时有多条线程没有获取同步状态要加入同步队列,这时如果不是线程安全的,请问谁先谁后呢?所以在此处的这个操作必须是线程安全的
添加过程如下图所示
- 设置首节点
- 同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后续节点,而后续节点将会在获取同步状态成功时将自己设置为首节点。
- 设置首节点是由获取同步状态成功的线程来完成的,由于只有一个线程能够成功的获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点后继节点,并断开首节点的next引用即可。
首节点设置过程如下图所示:
2) 独占式同步状态获取与释放
- 通过同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列移除。acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码中完成了同步状态的获取、节点构造、加入同步队列以及同步队列中自旋等待的相关工作。
主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
节点的构造以及加入同步队列依靠于addWaiter和enq方法:
//添加到同步队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
节点插入到尾节点:将并发添加节点的请求通过CAS变得”串行化“了。
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在 acquireQueued 方法中,当前线程在”死循环“中尝试获取同步状态,而只有前驱节点是头结点才能够尝试获取同步状态。原因如下:
- 头节点是成功获取到同步状态的节点,而头结点的线程线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程在被唤醒后需要检查自己的前驱节点是否是头结点。
- 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
-
节点自选同步状态如下图
-
独占同步状态获取流程,也就是acquire方法调用流程,如下图
- 同步状态释放:当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会"唤醒"其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放同步状态
Node h = head;
if (h != null && h.waitStatus != 0)//独占模式下这里表示SIGNAL
unparkSuccessor(h);//唤醒后继节点
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;//获取当前节点等待状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//更新等待状态
/* Thread to unpark is held in successor, which is normally just the next node.
But if cancelled or apparently null,
* traverse backwards from tail to find the actual non-cancelled successor.*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//找到第一个没有被取消的后继节点(等待状态为SIGNAL)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒后继线程
}