引言
可能我们平常很少听说它,就算是在并发里也很少用到它,但是我们所用的Condition(条件)、ReentrantLock(可重入锁)、ReentrantReadWriteLock(读写锁)就是根据它所构建的,它叫做队列同步器,是构建其他锁和同步组件的基础框架。
同步器的主要使用方法是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了对同步状态进行更改,就需要使用同步器提供的3个最主要的方法
protected final int getState() ; //获取同步状态
protected final void setState(int newState) ;//设置同步状态
protected final boolean compareAndSetState(int expect, int update) ;//原子性设置同步状态(使用CAS)
AQS本身是没有实现任何同步接口的,它仅仅是定义了若干同步状态的获取和释放来供自定义同步组件使用,它可以支持独占式的获取同步状态(只能有一个线程使用,其他都得等待),也支持共享式的获取同步状态(如读写锁中的读锁)
同步器的设计是采用的模板方法设计模式,我们只需要重写指定的方法,随后将同步器组合在自定义同步组件中,并调用模板方法提供的模板方法即可。
源码分析
AQS里面可以重写的主要方法如下(需要我们自己实现逻辑):
protected boolean tryAcquire(int arg) ; //独占式的获取锁
protected boolean tryRelease(int arg) ;//独占式的释放锁
protected int tryAcquireShared(int arg) ;//共享式的获取锁
protected boolean tryReleaseShared(int arg) ; //共享式的释放锁
protected boolean isHeldExclusively() ; //判断当前线程是否是在独占模式下被线程占用,一般表示是否被当前线程所占用
AQS提供的主要模板方法如下(我们不能重写):
public final void acquire(int arg) ; //独占式获取锁
public final void acquireInterruptibly(int arg); //与acquire(int arg)一样,但响应中断
public final boolean tryAcquireNanos(int arg, long nanosTimeout);//在acquireInterruptibly(int arg基础上加入了超时机制,在规定时间内没有获取到锁返回false
获取到了返回true
public final boolean release(int arg) ;//独占式的释放同步状态
public final void acquireShared(int arg) ; //共享式的获取锁
public final void acquireSharedInterruptibly(int arg); //在acquireShared(int arg)加入了对中断的响应
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) //在acquireSharedInterruptibly(int arg)基础上加入超时机制
public final boolean releaseShared(int arg) ; //共享式的释放锁
这上面的所有方法都加上了final,表明了它们不可继承,这就是模板方法设计模式
下面我们来看一下它的源码
- 首先是独占式同步锁的获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这是独占式获取锁的入口
具体流程如下:
1、用tryacquire(arg)去获取同步状态(需要我们自己实现)
2、获取不到把线程封装Node节点(addwaiter),在加入等待队列中
- tryAcquire(int arg)源码
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
源码就是简单地抛出异常,所以需要我们自己去实现
- addWaiter 源码
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//尝试加入到队尾,因为这可能有多个线程竞争,采用compareAndSetTail(CAS操作)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果表头为空又或者节点没加入到队尾
enq(node);
return node;
}
先构造成Node节点
- Node是其内部类,主要构造如下
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
prev:前驱节点;
next:后继节点;
thread:进入队列的当前线程
-
nextWaiter:存储condition队列中的后继节点。
- waitStatus:节点状态,主要有这几种状态:
1、 CANCELLED:当前线程被取消;
2、SIGNAL:当前节点的后继节点需要运行;
3、 CONDITION:当前节点在等待condition
4、PROPAGATE:当前场景下后续的acquireShared可以执行;
- waitStatus:节点状态,主要有这几种状态:
队列的基本结构如下:
- enq源码
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//死循环(自旋)地插入到队尾
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//通过compareAndSetTail保证节点能被线程安全添加
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
设置尾节点的过程
加入到同步队列中之后,就执行 acquireQueued
- acquireQueued源码
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
节点进入同步队列后,就开始自旋,每个节点都在观察自己是否满足条件,当“条件”满足,就可以获取同步状态,然后从自旋地过程中退出
这里有两点要说明:
1、头节点是获取到同步状态的点,而头节点的线程释放同步状态后,会唤醒其后续节点,所以每个节点都在判断自己的前驱节点是否是头节点,是之后看能不能获取到同步状态,只有两者都满足时,才能退出
2、维护同步队列的是fifo原则(先进先出),整个过程如图所示:
总结一下独占式获取锁的流程:
当前线程获取到同步状态后,就需要释放同步状态,使后续节点能够获取,主要是通过release(int arg) 实现的
- release(int arg) 源码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryrelease:判断能否释放同步状态
unparkSuccessor(h):唤醒后续节点
上面讲了独占式获取获取锁,下面讲讲共享式获取
共享式和独占式最大的区别是同一时刻能否有多个线程同时获取到同步状态,如图所示
共享式获取的入口是acquireShared 方法
- acquireShared 源码
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared方法是需要我们自己实现的,返回的是int值,当返回值大于0时,表示能获取到同步状态
- doAcquireShared源码
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
先把线程加入到同步队列中,然后死循环,判断前驱节点是否是头节点,然后获取同步状态,当两者都满足是就退出循环
与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1、调用tryReleaseShared方法释放状态;
2、 调用doReleaseShared方法唤醒后继节点;