1、前言
AQS(AbstractQueuedSynchronizer)是java.util.concurrent的基础。也是Doug Lea大神为广大java开发作出的卓越贡献。J.U.C中的工具类如Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock等都极大程度依赖了AQS.
我们就简单的看一下ReentrantLock的具体实现。
public void lock() { sync.lock();}
public void unlock() { sync.release(1);}
public Condition newCondition() { return sync.newCondition();}
public boolean isLocked() { return sync.isLocked();}
public int getHoldCount() { return sync.getHoldCount();}
对的,你没有看错,这些工具类都是这样凭借一个Sync的内部类做出的实现。而这样一个内部类继承了AQS,由此可见AQS对于J.U.S来说是基石般的存在,本文也将通过ReentrantLock带领大家深入的了解AQS。
2、AQS的简介
AQS的设计思路和原理等我高大上知识这里就不涉及了。想要了解的可以阅读Doug Lea大师的对这一部分的解读。大师原著
当然,笔者也大力推荐读者能把每个类前面的注释都能读一读,在研究类的源码之前能够对类有一个系统的视图。
如果大体的看一下AQS的话就能发现这个类有三个非常重要的属性.
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
看到这里我们大胆的做一个猜想(其实我都已经知道了)
- AQS维护了一个队列,并记录队列的头节点和尾节点
- 队列中的节点应该是因请求资源而阻塞的线程
- AQS同样维护了一个状态,这个状态应该是判断线程能否获取到锁的依据,如果不能,就加入到队列
接下来我们来看看这个Node具体是如何实现的.
volatile Node prev;//此节点的前一个节点。
volatile Node next;//此节点的后一个节点
volatile Thread thread;节点绑定的线程。
volatile int waitStatus;//节点的等待状态,一个节点可能位于以下几种状态
//该状态表示节点超时或被中断就会被踢出队列
static final int CANCELLED = 1;
//表示节点的继任节点需要成为BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒他的继任节点.
static final int SIGNAL = -1;
//表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
static final int CONDITION = -2;
可以看出,队列是一个双向队列,并且队列中的节点有几种可以选择的状态值
再看的话就能看到AQS中定义的几个重要的放方法
public final void acquire(int arg);//请求获取独占式资源(锁)
public final boolean release(int arg);//请求释放独占式资源(锁)
public final void acquireShared(int arg);//请求获取共享式资源
public final boolean releaseShared(int arg);//请求释放共享式资源
//独占方式。尝试获取资源,成功则返回true,失败则返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected int tryReleaseShared(int arg) {
throw new UnsupportOperationException();
}
可以看到AQS用acquire()和release()方法提供对资源的获取和释放
但是try**()结构的方法都是只抛出了异常,很显然这类方法是需要子类去实现的.
这也因为AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可以同时执行,如Semaphone/CountDownLatch), AQS负责获取资源(修改state的状态),而自定义同步器负责就要实现上述方法告诉AQS获取资源的规则.
两个重要的状态
1、AQS的state
- state可以理解有多少线程获取了资源,即有多少线程获取了锁,初始时state=0表示没有线程获取锁.
- 独占锁时,这个值通常为1或者0
- 如果独占锁可重入时,即一个线程可以多次获取这个锁时,每获取一次,state就加1
- 一旦有线程想要获得锁,就可以通过对state进行CAS增量操作,即原子性的增加state的值
- 其他线程发现state不为0,这时线程已经不能获得锁(独占锁),就会进入AQS的队列中等待.
- 释放锁是仍然是通过CAS来减小state的值,如果减小到0就表示锁完全释放(独占锁)
2、Node 中的waitStatus
- Node的正常状态是0
- 对于处在队列中的节点来说,前一个节点有唤醒后一个节点的任务
- 所以对与当前节点的前一个节点来说
- 如果waitStatus > 0, 则节点处于cancel状态,应踢出队列
- 如果waitStatus = 0, 则将waitStatus改为-1(signal)
- 因此队列中节点的状态应该为-1,-1,-1,0
源码解读
这一块开始解读源码的实现部分, 仍然只关心上面提到的几种方法.
1、acquire(int)
此方法是AQS实现独占式资源获取的顶层方法,这个方法和reentrantLock.lock()等有着相同的语义.下面我们开始看源码
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个函数共调用了4个方法, 其中tryAcquire(arg)是在子类Sync中实现, 其余在均是AQS中提供.
而这个方法的流程比较简单
- tryAcquire()尝试获取资源,如果成功, 则方法结束, 否则
- addWaiter()方法以独占方式将线程加入队列的尾部
- acquireQueued()方法是线程在等待队列中等待获取资源
- selfInterrupt(), 如果线程在等待过程中被中断过,在这里相应中断. 应该知道的是,线程在等待过程中是不响应中断的, 只有获取资源后才能自我中断.
看不懂不要紧,你已经知道了acquire()的大致过程,下面我们一一解读这其中的4个方法
流程1、tryAcquire()
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。tryAcquire()方法前面已经说过,这个方法是在子类中是实现的. 而在ReentrantLock中,这个方法也正是tryLock()的语义.如下是ReentrantLock对tryAcquire()实现的源码:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//等于0表示当前锁未被其他线程获取到
if (!hasQueuedPredecessors() //检查队列中是否有线程在当前线程的前面
&& compareAndSetState(0, acquires)) {//CAS操作state,锁获取成功
setExclusiveOwnerThread(current); //设置当前线程为占有锁的线程
return true;
}
} else if (current == getExclusiveOwnerThread()) {//非0,锁已经被获取,并且是当前线程获取.支持可重入锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //更改状态位,
return true;
}
return false;//未能获取锁
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
- 如果head=tail, 则队列未被初始化, 返回false, 否则
- 如果队列中没有线程正在等待, 返回true, 否则
- 如果当前线程是队列中的第一个元素, 返回true,否则返回false
流程2、addWaiter(int)
再看acquire()的第二个流程,获取锁失败, 则将线程加入队列尾部, 返回新加入的节点
private Node addWaiter(Node mode) {
//以给定的模式构建节点,节点有共享和独占两种模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS更新尾部节点
pred.next = node; //双向队列
return node;
}
}
enq(node); //如果队列没有初始化,程序就会到这一步.
return node;
}
private Node enq(final Node node) {
for (;;) {//经典的CAS配合使用方式, 一直循环知道CAS更新成功.
Node t = tail;
if (t == null) {//队列为空, 没有初始化,必须初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //设置尾节点,此时的head是头节点,不存放数据
t.next = node;
return t;
}
}
}
}
流程3、acquireQueued()
addWaiter()完成后返回新加入队列的节点, 紧接着进入下一个流程acquireQueued(), 在这个方法中, 会实现线程节点的阻塞和唤醒. 所有节点在这个方法的处理下,等待资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否拿到资源
try {
boolean interrupted = false; //等待过程中是否被中断过
for (;;) { //又是一个自旋配合CAS设置变量
final Node p = node.predecessor(); //当前节点的前驱节点
if (p == head && tryAcquire(arg)) {
//如果前驱节点是头节点, 则当前节点已经具有资格尝试获取资源
//(可能是头节点任务完成之后唤醒的后继节点, 当然也可能是被interrup了)
setHead(node); //获取资源后,设置当前节点为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果不能获取资源, 就进入waiting状态
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前一个节点的状态,还记得waitStatus是上面意思呒?
if (ws == Node.SIGNAL)
/*
*如果前驱节点完成任务后能够唤醒自己,那么当前节点就可以放心的睡觉了.
*记住,唤醒当前节点的任务是前驱节点完成
*/
return true;
if (ws > 0) { //ws大于0表示节点已经被取消,应该踢出队列.
do {
//节点的前驱引用指向更前面的没有被取消的节点. 所以被取消的节点没有引用之后会被GC
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//找到了合适的前驱节点后,将其状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接下来是 parkAndCheckInterrupt() 方法, 真正让节点进入waiting状态的方法,实在这个方法中调用的..
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //JNI调用, 使线程进入waiting状态
return Thread.interrupted(); //检查是否被中断
}
流程4、selfInterrupt()
上面也说了, acquire()方法不是立即响应中断的. 而是在获取资源后进行自我中断处理
小结
到此,acquire()的过程已经分析完毕, 我们就知道reentrantLock.lock()的全部过程.总的来说, 就是尝试获取资源, 如果获取不到就进入等待队列变成等待状态.具体细节前面已经详细叙述过.
release(int)
讲了如何获取到资源,接下来就应该如何释放资源.这个方法会在独占的模式下释放指定的资源(减小state).这个语义也是reentrantLock.unlock();
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试释放资源
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒队列的下一个节点
return true;
}
return false;
}
1 tryRelease()
释放指定量的资源,这个方法是在子类中实现的.我们以reentrantLock.unlock()为例解读资源释放的过程
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //state减去指定的量,
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //独占锁模式下,state为0时表示没有线程获取锁,这时才算是当前线程完全释放锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2 unparkSuccessor(Node node)
用这个方法唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { //waitStatus表示节点已经被取消,应该踢出队列
s = null;
//从后想前找到最靠前的合法节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这个方法也比较简单,就是用一个JNI方法unpark()唤醒队列中下一个需要处理的节点,.
非公平锁
上述介绍完ReentrantLock中的公平锁,首先回顾一下公平锁的整个流程
1、ReentrantLock.lock(),这个请求回交由内部类Sync处理。因为有公平和非公平的区分,所以Sync转而把任务交给子类NonfairSync或者FairSync处理。
2、在具体来看看FairSycn中的lock具体怎么实现
final void lock() { acquire(1);}
acquire()的具体实现上面已经介绍,接下来登场的就是非公平锁的实现。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在非公平锁中,线程请求资源是会先查看锁是否被占用,如果锁空闲就直接占用锁,否则究进行acquire()。而这个函数是有AQS具体实现,所以,如果当前锁被占用,非公平锁就享受和公平锁一样的待遇,就是老老实实的进入等待队列,等前任节点唤醒自己。
总结
到这里我们已经讲述了独占锁的获取和释放. 当然没有涉及的还很多.如共享模式我们还没有涉及.以及响应中断的acquireInterruptibly()也没有涉及.
至于响应中断,实现起来与上面介绍的并无太大区别.共享式锁获取与释放待后续篇章再来研究.
水平有限,水平有限,有任何不当之处,还请指正.本文也还在打磨之中