AQS 全称 AbstractQueuedSynchronizer,靠着开局一个int state和一个双端FIFO的Node队列,实现抽象的队列式的同步器。AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
Node类如下,
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
//队列前后
volatile Node prev;
volatile Node next;
//节点对应的线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
对外需要重写的方法,如果没有重写,使用的话,会抛异常
//获得锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//当前线程是否持有锁
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
公开使用的方法如下,这里列出我们重点说明的
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
从上述protected和public的方法,可以看到,acquire方法需要调用用户重写的tryAcquire方法,根据tryAcquire返回值,决定下一步操作,release方法同理。
我们结合Reentrantlock的实现来讲解,
Reentrantlock | Sync
代码如下,删除不重要的
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock(); //定义统一的抽象lock方法,等待实现
//定义非公平锁的获取
final boolean nonfairTryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //没有任何锁,非0表示锁已经被占
if (compareAndSetState(0, acquires)) { //CAS竞争获得锁
setExclusiveOwnerThread(current); //竞争后设置锁的持有线程
return true;//返回
}
}
else if (current == getExclusiveOwnerThread()) { //锁的可重入设置
int nextc = c + acquires;
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
setState(nextc); //重入次数累加
return true;
}
return false;
}
//锁释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //重入次数减去
if (Thread.currentThread() != getExclusiveOwnerThread()) //当前锁持有线程判断
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果当前重入次数为0,锁释放,返回true,表示已经释放锁。
free = true;
setExclusiveOwnerThread(null); //清空持有线程
}
setState(c);
return free; //否则,返回false,没有释放锁
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
从代码上可以看出来,sync玩的是aqs的state而已,并没有涉及到fifo队列,而且,上述代码和我们自定义的可重入锁大同小异。
Reentrantlock | NonfairSync
代码如下,
static final class NonfairSync extends Sync {
//实现Sync的lock方法
final void lock() {
if (compareAndSetState(0, 1)) //直接cas一下,如果获得到锁,说明当前没有锁,不需阻塞
setExclusiveOwnerThread(Thread.currentThread()); //设置持有线程
else
acquire(1); //调用aqs的acquire,进而调用下面的tryAcquire
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); //调用nonfairTryAcquire
}
}
Sync生成对象sync
Reentrantlock使用sync的acquire和realse方法(来自aqs),实现获取锁和释放锁,
public void lock() {
sync.lock(); //后者是非公平的Sync,或者是公平的Sync
}
public void unlock() {
sync.release(1);
}
所以,重点指向aqs的acquire和release
方法。再贴一下code。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && //如果tryAcquire放回true,表示获得到锁,退出if,否则表示没有得到
//锁,进入阻塞队列,执行if条件 &&后面的语句
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //包装Node
Node pred = tail;
if (pred != null) { //插入队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //如果执行到此,说明上面快速插入队列尾部失败,毕竟是cas方式,这里enq通过
//无限循环实现cas方式插入队列尾部
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //循环,死磕
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //直到这个时候退出,
//即,节点node的前一个节点是head且如果成功获得锁,如果没有获得锁,下次循环继续,继续阻塞在队里中。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // head后移后,当前节点从队列移除,返回
}
//如果没有退出for循环,干什么?死磕当前节点的前一个节点,
//当前一个节点都在等待信号signal的时候,当前节点如何操作?继续等待!!!
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//查看当前节点的前节点是否是signal,顺便清空下无效的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //只要前一个节点是signal,则返回
return true;
if (ws > 0) { //清理状态无用的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//继续等待,返回是否中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
release
代码如下,
public final boolean release(int arg) {
if (tryRelease(arg)) {
//如果重写的tryRelease方法返回true,表明释放锁了
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
//唤醒successor,即,唤醒head的successor后继者,
return true;
}
//如果在这里,说明没有释放锁,返回true
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//设置当前Node状态为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果当前Node的下一个节点符合状态就直接进行唤醒,
// 否则从队尾开始进行倒序查找,找到最优先的线程进行唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//寻找下一个等待线程节点来唤醒等待线程并通过LockSupport.unpark()唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
Reentrantlock | FairSync
公平的锁,保证了阻塞的线程唤醒后,能够获得到锁,而不是被新的线程抢占。和非公平的锁唯一区别在ReentrantLock中的部分,会根据情况判断是否阻塞当前的线程
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors是关键,他的作用在于,对于新来的线程,
//不让其抢占锁,而是直接进入到阻塞队列中等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 当前线程不为空,并且阻塞的线程不是当前线程(非重入)
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread()); //
}