一、什么是AQS
JAVA中,在多线程环境下,一般需要锁来确保数据安全,而我们通常所说的锁,其实分为了两类,一类是sychronized,利用的是指令级别的monitor-enter 和 monitor-exit;另一类是Lock,例如ReentrantLock等,是依靠代码实现的。
而AQS,即AbstractQueuedSynchronizer,又叫队列同步器,就是用来构建锁和其他同步组件的基础框架。
AQS是一个以继承方式使用的抽象类,他维护了一个volatile的state和一个FIFO线程等待队列,类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的。
二、AQS原理简介
2.1 status状态
AQS使用int类型来保存同步状态,并且该状态变量(status)被声明为是volatile变量,并暴露出getState、setState以及compareAndSet操作来读取和更新这个同步状态,由于compareAndSet是CAS操作,且变量本身是volatile变量,所以通过暴露这三个方法就达到了同步状态的原子性管理,确保了同步状态的原子性、可见性和有序性。
2.2 阻塞 LockSupport
AQS也是调用的LockSupport.park阻塞当前线程直到有个LockSupport.unpark方法被调用。
LockSupport
在Java多线程中,当需要阻塞或者唤醒一个线程时,都会使用LockSupport工具类来完成相应的工作。LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也因此成为了构建同步组件的基础工具。
LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread)方法来唤醒一个被阻塞的线程,这些方法描述如下:
1.void park() 阻塞当前线程,如果调用unpark(Thread)方法或被中断,才能从park()返回
2.void parkNanos(long nanos) 阻塞当前线程,超时返回,阻塞时间最长不超过nanos纳秒
3.void parkUntil(long deadline) 阻塞当前线程,直到deadline时间点
4.void unpark(Thread) 唤醒处于阻塞状态的线程
5.park(Object blocker) 阻塞当前线程,参数blocker是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控
6.parkNanos(Object blocker, long nanos) 阻塞当前线程,超时返回,阻塞时间最长不超过nanos纳秒,参数blocker是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控
7.parkUntil(Object blocker, long deadline) 阻塞当前线程,直到deadline时间点,参数blocker是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控
2.3 同步队列
AQS整个框架的核心就是如何管理线程阻塞队列,他通过内置的FIFO双向队列来完成线程的排队工作。该队列内部通过结点head和tail记录队首和队尾元素,而该队列的元素类型均为一个内部类Node。
Node
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
先看下waitStatus的几种状态
1.CANCELLED=1 表示线程因为中断或者等待超时,需要从等待队列中取消等待,被取消的节点会被踢出队列;
2.SIGNAL=-1 表示后续节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,会通知后继节点,后继节点会获取锁并执行(当一个节点的状态为SIGNAL时就意味着在等待获取同步状态,前节点是头节点也就是获取同步状态的节点);
3.CONDITION=-2 表示节点在条件等待队列中,当其他线程调用了Condition的signal()方法之后,该节点就会从条件等待队列移动到同步队列中;
4.PROPAGTE=-3 表示下一次共享式同步状态获取将会传递给后继节点获取这个共享同步状态(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态);
入队操作
如图所示,入队的操作主要分为两步
1.为了确保线程安全,同步器提供了一个CAS方法,它需要传递当前线程“认为”的尾节点和当前节点,将"认为"的尾结点和实际的尾结点比对成功才会将尾结点和新入队的节点进行关联
2.上述CAS操作成功后,当前节点才正式与之前的尾结点建立关联
出队操作
遵循FIFO规则,能成功获取到AQS同步状态的必定是首节点,故和入队操作不同,出队操作没必要CAS处理,只需要将首节点设置为原首节点的后续节点同时断开原节点、后续节点的引用即可。
2.4 条件队列
前文介绍node节点的waitStatus时提到,当值为-2时,表示节点在条件队列中,也就是说不同于前文的同步队列,在AQS中还存在一个条件队列,也就是一个ConditionObject的内部类,如下图
ConditionObject实现了Condition接口,Condition接口提供了类似Object管程式的方法,如await、signal和signalAll操作,这里对Condition接口的方法做下简单的介绍
//当前线程进入等待状态,直到被通知(signal)或被中断
void await() throws InterruptedException;
//当前线程进入等待状态,直到被通知(signal),对于中断不做响应
void awaitUninterruptibly();
//当前线程进入等待状态,等待指定时长(单位为毫秒),直到被通知、中断,或者超时
long awaitNanos(long nanosTimeout) throws InterruptedException;
//当前线程进入等待状态,等待指定时长,直到被通知、中断,或者超时
boolean await(long time, TimeUnit unit) throws InterruptedException;
//当前线程进入等待状态,直到被通知、中断,或者到达指定时间。如果没有到指定时间就被通知,方法返回true,否则false
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程,该线程从等待方法返回前必须获得与Condition相关联的锁
void signal();
// 唤醒所有等待线程,该线程从等待方法返回前必须获得与Condition相关联的锁
void signalAll();
ConditionObject中存在一个firstWaiter和一个lastWaiter,也就是说他有自己单独的队列,signal操作是通过将节点从条件队列转移到同步队列中来实现的,而await操作就是当前线程节点从同步队列进入条件队列进行等待。
AQS只有一个同步队列,但是可以有多个条件队列。
2.5 资源共享方式
在前文给出Node节点的源码时,可以发现在Node节点中还存在SHARED 和 EXCLUSIVE 两个静态常量。是的,AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可。
自定义同步器实现时主要实现以下几种方法:
1.isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
2.tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
3.tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
4.tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
5.tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
三、AQS源码分析
以上分析了大概的原理,接下来看看主要的几个方法的源码
3.1 acquire(int)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire是一种以独占方式获取资源,如果获取到了,就会直接返回,否则会被添加到队列中进行等待,直到获取到资源为止,并且这个操作是忽略中断的。通过上面的源码可以看到,该方法调用了以下几个方法
1.tryAcquire() 用于获取资源,如果获取到了就直接返回
2.addWaiter() 添加到同步等待队列的队尾,这里传入的Node.EXCLUSIVE表示是独占模式
3.acquireQueued() 使线程在同步队列中进行等待,获取到资源后才会返回,整个等待过程中被中断过,则返回true,否则返回false。
4.selfInterrupt() 获取到资源后,如果acquireQueued() 被中断过,则if条件判断为空,会进到这里进行自我中断
详细来看看
3.1.1tryAcquire()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
默认的实现只是单纯抛了个异常,具体的需要子类进行自定义扩展,该方法表示尝试以独占的方式直接获取资源,如果获取成功,则直接返回true,否则直接返回false
3.1.2 addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先获取到队列的尾结点。
如果尾结点不为空,则通过compareAndSetTail(pred, node)方法以CAS操作将尾结点设置为当前节点,如果设置成功,则将当前节点和前节点进行关联。
如果尾结点为空,则表示队列还未初始化,此时会调用enq()方法,这里也贴出enq方法的源码
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到这里采用CAS自旋的方式初始化队列,第一次循环时会将头结点和尾结点均设置为一个空的Node(new Node()),第二次循环时进入else分支,将当前节点设置成为尾结点。
3.1.3acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//如果是头结点且获取资源成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到该方法分为两部分,第一部分为如果是头结点则直接尝试获取资源,如果获取成功则返回,第二部分为如果不是头结点或者资源没有获取成功,则会判断当前节点是否应该park以及检查是否中断。
该方法实际是一个自旋,只有当拿到锁之后才会返回。接下来进一步的看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果前节点是SIGNAL状态,表示当前节点应该处于等待状态,需要中断,如果前节点状态>0,表示前节点已被取消,则会剔除前面所有取消的节点(循环将第一个不是取消状态的节点设置为当前节点的前节点),其他状态则将当前节点设置为SIGNAL状态,表示当前节点可以获取同步状态。
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
让线程waiting,调用park方法后,除非被中断或者unpark才会被唤醒,然后通过Thread.interrupted()清除线程的中断标记。
所以回顾一下就会发现,整个acquire()方法的过程为
1.tryAcquire() 用于获取资源,如果获取到了就直接返回
2.addWaiter() 以Node.EXCLUSIVE独占模式添加到同步等待队列的队尾,如果队列为空则首先将队头和队尾设置为一个new Node()然后将当前节点添加进来
3.acquireQueued() 使线程在同步队列中进行等待,获取到资源后才会返回,如果当前节点是队头,则直接尝试获取资源,成功后返回,否则会检查当前节点的前节点的状态,以判断当前节点的线程是否应该被park,如果整个等待过程中被中断过,则返回true,否则返回false。
4.selfInterrupt() 获取到资源后,如果acquireQueued() 被中断过,则if条件判断为空,会进到这里进行自我中断
3.2 release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法比较简单,直接通过tryRelease来释放资源,如果成功了则唤醒头结点的后续节点,unparkSuccessor中unpark了后继节点的线程,如下面贴上的代码所示
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.3 acquireShared()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
可以看到,acquireShared也会直接尝试获取资源,如果获取失败则进入在doAcquireShared中首先以Node.SHARED的状态添加到队尾,同时当前节点的前节点(node.predecessor()获取)是head时,会再次获取资源,和独占模式不同的是,如果获取成功,且剩余资源>0,会通过调用setHeadAndPropagate()来释放剩余资源并唤醒后续节点线程, doReleaseShared()后文会说明。
3.4 releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
简单来说就是释放资源唤醒后续节点,但是和独占模式不同的是,独占模式在当前资源释放成功后,如果头结点的状态不等于0就会唤醒头结点的线程,而共享模式下,就算头结点的状态已经是0了,还会用CAS操作将同步状态传递下去,除非传递失败。