Java并发编程--ReentrantLock可重入性探索
我们直接先看其公平锁情况下的可重入性到底是怎么回事,由于我们讨论的是公平锁的情况,而相关的代码在ReentrantLock的内部类FairSync中。
1. lock()
public void lock() {
sync.lock();
}
由于是公平锁,所以我们需要重FairSync中查看lock方法:
final void lock() {
acquire(1);
}
而这里的acquire方法继承自AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先我们先提前说一下tryAcquire返回值是一个boolean,为true说明当前线程成功获取了ReentrantLock的锁,并且ReentrantLock锁是一个独占锁,而这个if条件,如果成功获取了锁,那么acquire方法就直接返回了。
AQS已经为该方法做了方法的实现,在FairSync中我们只要实现tryAcquire方法即可:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 说明当前锁还未被持有
// hasQueuedPredecessors返回false的情况为:当前线程在等待队列的头部或者等待队列为空
// 这就说明了:只有等待队列的头结点可以获取锁
// Q:什么情况下当前线程已经在头结点了,但是还没有获取锁?
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 这里说明当前线程就是独占的线程
int nextc = c + acquires; // 持有锁的线程获取锁的次数,这也表明了可重入性
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 原来可重入性是这么个意思
return true; // 如果current再次使用该所对象加锁,那么会直接返回true,可重入就是这么个意思
}
return false;
}
如注释中所说,当前线程能够成功获得锁有两种情况,分别代表首次加锁和重入锁:
- 如果c为0的话,说明当前锁还没有被任何线程独占,这时候会对队列进行判断
- hasQueuedPredecessors方法返回false有两种情况:当前等待队列为空或者当前线程就是队列头部;此时才可以尝试加锁
- 一个CAS操作将ReentrantLock的state属性设置为acquire值(调用的时候传递的为1)
- setExclusiveOwnerThread方法将当前线程设置为独占锁的线程
- 如果当前线程为独占线程,则保证可重入性:
- 将state进行加一操作
从这一个成功加锁的过程我们可以产生一些大胆的推断:
- 独占该锁的线程位于等待队列的头部
- state属性表示独占的线程加锁的次数,在之后解锁的时候可能也要这么多次数的unlock才可以释放锁
- 可重入性的保证就是一句current == getExclusiveOwnerThread()
注意:有没有发现,对于第一个加锁的线程,它会加锁成功,但是这个第一次加锁的线程,没有被封装成一个Node节点放到队列中;所以说,持有锁的线程是队列的头结点这句话有问题:因为持有锁的线程根本不在队列中,何来头结点一说。在下文分析加锁失败的情况会证明这一论断。
关于hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
首先对于第一次加锁的线程,此时由于h == t == null所以返回false,而对于后面的返回false的情况大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false,也就是只有头结点的后继节点调用该方法时,才会返回false表示可以尝试加锁。同时这也是可重入锁中公平锁的来源,对于之前已经在队列中的节点,那么新来的节点想要加锁,该方法会返回true说明队列中在你之前还有人在等待,得前面没人等待了你才能返回false,才能尝试去加锁,保证了先来后到的公平性。
注意:对于第二个尝试加锁的线程,由于此时前面有一个人持有锁,所以它在调用lock-acquire-tryAcquire方法时由于判断state!=0且当前线程不是独占锁的线程直接判断了加锁失败,从而被添加到了队列中,这条路线中不涉及到hasQueuedPredecessors方法的调用(显然此时它是满足h==t这个判断的)
加锁失败情况
还记得acquire方法吗:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果tryAcquire方法返回false的话,说明加锁失败,同时通过上面的代码我们知道,如果加锁失败的话,当前线程没有被执行各种处理,所以我们在分析acquireQueued方法的时候没有任何后顾之忧,它的代码没有收到tryAcquire的影响:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 注意,这里的意思是头结点的后继节点tryAcquire成功,也就是获取了锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 这里说明了,如果头结点的后继节点成功获得了锁,直接返回false
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对于这个入队操作,有几点需要说明:
- 只有头结点的next节点才会主动调用tryAcquire方法取申请获取锁
- 当头结点的next节点成功的得到了锁之后,通过setHead方法会将自己设为头结点
- 移除原来的头结点之后return false
好了,接下来我们就来说一说为啥上面说持有锁的线程不在队列中,如果说上文对于首次加锁的线程没有加入队列产生怀疑的话,那么这里的setHead方法会使你幡然醒悟:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
看到了没:node.thread = null把头结点的线程置为空了!!!所以,对于独占到锁的线程来说,它此时已经不再队列中了!!所以说,只有头结点时持有锁的节点这句话不准确!!
那么问题来了,这个头结点时怎么初始化的呢?
头结点的初始化
答案就在调用acquireQueue方法时的addWaiter方法:
private Node addWaiter(Node mode) {
// 思路:新键当前线程的Node节点;如果tail被初始化过,则直接添加到尾部;否则执行enq操作先初始化tail再入队
// 根据注释:mode传递的值要么是Node.SHARED要么是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这里的代码思路很简单,就是构建Node节点,然后插入到尾部,对于这个if判断语句,为false的情况会执行enq方法,在enq方法里面会有头结点的初始化:
private Node enq(final Node node) {
// 思路:获取尾部,将参数中的node节点直接加入尾部;然后CAS更新tail引用
for (;;) {
// 获取尾部
Node t = tail;
// 如果tail为空,说明是第一次入队操作,通过CAS初始化节点
// 这里初始化只是调用了默认构造器,目的是为了第二次for循环时tail不为null
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 因为tail是AQS在并发环境下的共享资源,所以修改tail变量要使用CAS操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
有没有发现,这个头结点的thread没有传递是参数,是一个null,这也证明了我们之前说的是正确的。
所以我们就知道了,在第一次有线程加锁的时候,它会成功得到锁,那么当第二个线程需要等待这个锁的时候,调用addWaiter的时候会初始化链表的头结点。
阻塞线程的挂起
可能有的人会问,每一个阻塞的节点都有一个无限for循环自旋,那么线程没有被挂起的话岂不是很浪费cpu资源?挂起的操作就在for循环的后面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
interrupted = true;
其中shouldParkAfterFailedAcquire(p, node)方法用来判断是否需要挂起获取锁失败的线程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 判断是否可以挂起的思想是:如果该节点的pred节点的waitStatus已经被设置为了SIGNAL
// 那么说明该节点已经料理好后事了,可以在某个时刻被唤醒,所以可以安全的挂起
return true;
if (ws > 0) { // 说明pred已经被cancelled了,直接移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 说明pred的waitStatue是0或者PROPAGATE,此时设置为SINGAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看到了吗,只有参数中的pred,也就是需要被挂起的节点node的前一个节点的waitStatus为SINGLE的时候,才会返回true。而我们调用addWaiter方法创建Node节点的时候,waitStatue都是默认值0,所以在该方法的后面else语句中有一个CAS操作将其设置为SINGLE,这样的话,在acquireQueued方法的for自旋中,需要被挂起的线程经历两个for循环就可以使得shouldParkAfterFailedAcquire方法返回true。
之后,真正为node执行挂起的操作位于parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这样的话,没有获取到锁的线程就真的被挂起了。
2. unlock()
public void unlock() {
sync.release(1);
}
release方法同acquire方法一样,都是在AQS中又方法实现的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AQS的实现类只需要重写tryRelease方法即可。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法很简单,就是将ReentrantLock的state值减去一,然后如果此时state为0说明独占的线程已经完全释放了锁,此时可以解除绑定,否则返回false。
而真正实现释放锁后唤醒其他线程的方法位于release中的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
我们上面在分析线程的挂起的时候说到了要想挂起,那么node的前一个节点的waitStatus必须为SINGLE,而SINGLE在Node这个类中的值为-1.是小于0的,所以一定会执行到unparkSuccessor方法。
private void unparkSuccessor(Node node) { // 通过名称,该方法是唤醒后继节点
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 根据注释和代码:这里是处理node参数的next节点为null或者已经取消的情况
// 此时从尾部开始遍历,找没有被canclled的节点唤醒
// PS:为何不从node开始往后找,而是从尾部开始往前找?
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 代表参数node的next节点不为空,则唤醒其中的线程
if (s != null)
LockSupport.unpark(s.thread);
}
这里的LockSupport.unpark方法就是唤起其他线程的地方,且一次只会唤醒一个线程,大部分情况就是头结点的后继节点。