上文说到 ReentrantLock 用到代理模式,其中最核心的加锁 / 解锁操作都是调用 Sync 对象完成。而从源代码可以看出,Sync 对象代码量也不多,核心代码被封装在父类 AbstractQueuedSynchronizer(简称 AQS)中,今天就来简单的探究探究这个 AQS。
AQS 框架
实际编程时,大多数时候我们不会直接使用 AQS,ReentrantLock / CountDownLatch 等标准同步器能够满足绝大多数情况的需求。但能如果能了解 AQS 的实现方式,对于理解这些同步器类的工作原理有很大帮助。
设计模式
从方法调用顺序看,AQS 采用了标准的模版方法模式,对外放出以下公开方法:
//独占模式接口
public final void acquire(int arg);
public final void acquireInterruptibly(int arg)
throws InterruptedException;
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException;
public final boolean release(int arg);
//共享模式接口
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException;
public final boolean releaseShared(int arg);
AQS 同时支持独占和共享模式,子类同步器不能重载这些对外接口,但是必须重载独占模式接口或者共享模式接口:
//独占模式
protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg);
//共享模式
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);
//是否是独占模式
protected boolean isHeldExclusively();
子类的 tryAcquire/tryAcquireShared 会在 AQS 对外 acquire 系列公开接口中调用,返回 true / 非0 表明当前线程获取执行“许可”;如果是 false / 0 表明当前线程未获得“执行”许可,需要被阻塞挂起,放入阻塞队列。
类似的,子类的 tryRelease / tryReleaseShared 会在 AQS 对外 release 系列公开接口中调用,返回 true 表明释放成功,AQS 框架会在阻塞队列中运行其他线程。使用模式如下:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
有趣的地方
AQS 在定义 tryAcquire 这些需要子类实现的接口时,并不是用 abstract 关键字,而是抛出异常。究其原因,可能是如果使用 abstract 关键字,那子类必须实现所有独占模式和共享模式的接口。而实际中根据子类同步器的同步语义,一般只会使用其中一种模式,所以实现所有两种接口后会造成代码冗余,以及语义的晦涩难懂。
结构功能
得益于模版方法模式,子类同步器只需要关注让让线程是否获得执行“许可”,而线程的阻塞和入队都是用 AQS 控制。所以 AQS 最主要的工作是以下3件事:
- 提供共享变量 state ,以及 CAS 操作供子类使用。state 语义由子类定义,子类使用 state 判断线程是否准入;
- 阻塞线程。子类同步器禁止线程准入后,AQS 调用 LockSupport.park 将当前线程阻塞挂起(线程进入 TIMED_WAITING 状态)
- 管理阻塞队列。改进型 CLH lock queue,实现阻塞线程的 FIFO,当准入许可释放的时候,调度下一个线程执行。
阻塞队列
AQS 的阻塞线程队列使用了改进型的 CLH lock queue(具体可以查看内容来源),保证阻塞线程 FIFO。简而言之:
CLH lock queue其实就是一个FIFO的队列,队列中的每个结点(线程)只要等待其前继节点释放锁,或者说是根据前继节点的状态决定其是否可运行。
Node
说 AQS 的阻塞队列是改进型的 CLH lock queue,是因为队列中的 Node 不仅有前继节点引用,也包含后续节点引用,同时还有 status 描述多种状态,以便支持取消等待阻塞。以下是 Node 数据结构:
字段 | 解释描述 |
---|---|
prev | 指向前继节点 |
next | 指向后续节点 |
thread | 被阻塞线程 |
nextWaiter | 使用在 condition 等待队列中,或者共享节点 |
status | 节点状态 |
1,CANCELLED,表示当前节点的线程因为超时或中断被取消了 | |
0,除了以上四种状态的第五种状态,一般是节点初始状态 | |
-1,SIGNAL,表示当前节点的后续节点中的线程通过park被阻塞了,当前节点在释放或取消时要通过unpark解除它的阻塞 | |
-2,CONDITION,表示当前节点在condition队列中 | |
-3,PROPAGATE,共享模式的头结点可能处于此状态,表示无条件往下传播,引入此状态是为了优化锁竞争,使队列中线程有序地一个一个唤醒 |
队列
ASQ 对象中有有两个成员变量,head 指向阻塞队列队首,tail 指向阻塞队列队尾。队列结构如下图所示:
waitStatus
waitStatus 表示的是后续节点状态,这是因为 AQS 中使用 CLH 队列实现线程的结构管理,而 CLH 结构正是用前一节点某一属性表示当前节点的状态,这样更容易实现取消和超时功能。
head 和 next
队列 head 指向队首,head指向的首节点 node 代表当前获取执行许可的线程,只有 next 引用。当前线程释放许可的时候,head 节点可以根据 next 快速找到下一个准入的线程。这是其对 CLH lock queue 的优化。如果后继节点为空或者 CANCELED, 则从队列 tail 处开始,向前找寻离它最近的非 CANCELED 节点,该节点的线程则是下一个获取准入的线程。
Node 排队取消
当某线程发生等待超时或者被 interrupt 时,对应 node 的状态被设置为 CANCELLED 。
该节点的 next 引用指向本身,方便以后 gc 的时候,对象被快速回收。
该节点的前继节点的 next 指向该节点的后续节点,当然前继节点的状态不是 CANCELLED。
原子操作
head 和 tail 成员变量都采用 volatile 修饰符,保证线程可以及时访问到最新数据。
入队,出队,修改 node 的 prev 和 next 指针,都采用了 CAS 操作,配合 while 或者 for 自旋,保证操作的原子性。
编程场景
一般来说我们不需要直接使用 AQS,如果需要创建自定义的同步器,可以参考 ReentrantLock,需要:
- 创建对应 Sync 继承 AQS,根据同步语义,按需重载 tryAcquire / tryRelease 系列接口,可以调用 AQS 的 getState,setState,compareAndSetState 接口;
- 创建同步器类,并包含 Sync,使用代理模式,调用 AQS 的 public 接口实现同步接口;
一般不会让同步器类直接继承 AQS,这样可以防止同步器类暴露太多非必须的 AQS 接口。
比如我们可以如下实现一个 Mutex:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
内容来源
Java 并发编程实战
http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html
http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html
http://blog.csdn.net/wangyangzhizhou/article/details/40958637?utm_source=tuicool&utm_medium=referral
http://blog.csdn.net/wangyangzhizhou/article/details/42177703
http://blog.csdn.net/wangyangzhizhou/article/details/42065151
http://blog.csdn.net/wangyangzhizhou/article/details/42197927