概念
ReentrandLock是基于AbstractQueuedSynchronizer实现的,所以先来简单介绍下AQS。
我们最熟悉的同步锁应该是synchronized,熟悉java的应该都用过这个,它是通过底层的 monitorenter 和 monitorexit 这两个字节码指令来实现锁的获取和释放。
这里介绍的AbstractQueuedSynchronizer同步器(以下简称AQS),是基于FIFO队列来实现的,通过state的状态, 来实现acquire和release。
state为0,表示当前没有线程执行,可以获取锁,state为1表示已有线程正在实行
代码分析
AQS是基于FIFO队列来实现的,所以队列必然由一个个节点组成,我们先从节点入手。
Node是AQS中的一个内部静态类,主要包含以下几个成员变量:
//waitStatus有以下几个状态
/**
** CANCELLED,值为1,表示当前线程被取消,被终端的Node不回去竞争锁,最终会被踢出队列
** SIGNAL,值为-1,表示当前节点的后继节点被阻塞了,需要被唤醒
** CONDITION,值为-2,表示当前节点在等待condition,因为某个条件被阻塞
** PROPAGATE,值为-3,表示锁的下一次获取可以无条件传播
**/
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 入队列时的当前线程
volatile Thread thread;
// 表示下一个等待condition的Node
Node nextWaiter;
介绍完Node,再来介绍下几个AQS比较重要的成员变量:
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
介绍完AQS几个关键的成员变量,就可以来开干源代码了,下面通过一个简单demo例子,来逐步分析ReentrantLock的实现原理
demo代码如下:
public static void main(String[] args) {
final ExecutorService exec = Executors.newFixedThreadPool(2);
final ReentrantLock lock = new ReentrantLock();
for(int i =0; i< 2; i++){
Runnable r = new Runnable() {
@Override
public void run() {
lock.lock();
try {
// do something start
TimeUnit.SECONDS.sleep(5);
// do something end
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
};
exec.submit(r);
}
exec.shutdown();
}
先来看下lock方法:
public void lock() {
sync.lock();
}
ReentrantLock中有一个抽象类Sync,它继承了AQS,所以ReentrantLock的实现大多就是基于Sync来完成实现,ReentrantLock又分为公平锁和非公平锁之分,所以,ReentrantLock内部有两个sync的实现:
static final class NonfairSync extends Sync{...}
static final class FairSync extends Sync{...}
公平锁:获取锁的顺序为调用lock的顺序
非公平锁:每个线程抢占锁的顺序是不定的
由于ReentrantLock我们用的比较多的是非公平锁,所以先来看下非公平锁lock的实现,代码如下:
final void lock() {
if (compareAndSetState(0, 1))
// 成功后将当前线程设置到AQS的一个变量中,说明这个线程拿走了锁。
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
demo中我启动了两个线程,假设线程1进来,调用compareAndSetState方法,此时没有任何线程占有锁,所以通过原子操作CAS(如果对cas不了解的可以百度下,百度一堆)将state的状态改为1,这个是肯定能成功的,之后将线程1设置到AQS的exclusiveOwnerThread个变量中,表示线程1拿走了这个锁。这个时候线程2进入了lock方法,此时通过CAS操作,发现预计值0此时已经变成了1,所以设置失败,进入else分支,调用acquire方法。
也就是说非公平锁在线程第一次竞争失败后,还是会调用acquire方法,可能进入队列中,而公平锁是直接调用acquire方法
acquire方法是调用父类AQS的acquire方法,代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代码可以看出大概的逻辑,线程1尝试去获取锁,如果失败,则加入到队列,挂起
我们来看下tryAcquire方法,非公平锁最终调用的是nonfairTryAcquire方法,代码如下:
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS中锁的标志位state 0 表示锁没有被占有 1 表示锁已经被拿走了
int c = getState();
if (c == 0) {
// cas修改state状态
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
假设线程1进来后,state=0,表示此时没有线程占用锁,修改state状态,成功获取锁,返回成功,而此时线程2也去尝试获取锁,由于state是volatile修饰的,所以线程2获取到state此时为1,进入else if的逻辑,这里要注意下,不是说没有获取锁就直接失败,我们知道ReentrandLock是一个重入锁,就是说一个线程可以多次调用ReentrandLock,每次调用state+1,因为是同一个线程(已经拿到了锁),所以这里不存在竞争,直接返回成功,如果条件都没满足则返回false。
当tryAcquire获取锁失败后,调用acquireQueued方法,调用acquireQueued我们先来看addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
mode这里表示Node的类型,在这里表示独占锁(AQS可以实现独占锁和共享锁),用当前线程构造一个Node对象,线程1进来后,此时队列里面是空的,所以尾节点tail为空,走enq方法(等会看这个方法),假设此时enq方法已经执行结束,线程2进入addWaiter方法,此时tail不为空,将线程2的Node的前驱节点指向pred,之后将线程2的node设置为tail,最终的目的就是将新来的线程,放到队列最后。等待唤醒。下面来看enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法的目的是让所有的线程Node最终都加到队列里面,这里代码写的非常的6,先一步一步的分析,假设,第一个线程进来,tail为空,这里的判断和之前一样,因为是多线程,所以需要二次判断,为空时,先用cas操作构造一个头结点,这里会出现竞争的情况,因此enq方法使用while(true)的方式,做了个循环,如果设置成功,则将head指向tail,如果失败,则重复尝试,直到成功为止。
当head设置成功后,之后的线程进入else逻辑,将后续进来的Node加入到队尾,compareAndSetTail失败话,这里没有直接返回,而是通过for循环继续插入到队尾,最终所有没有成功获取到锁的线程,全部加入到队列中。
接着看AQS的acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先判断node的前驱节点是否是head,如果是,证明当前线程就是下一个即将获取到锁的线程,所以此时先尝试获取锁一次这里调用了tryAcquire方法,如果获取到锁,则将当前head设置成当前node,并将之前的node(之前为head的node)设置为空,gc回收。
如果不是head或者获取锁失败,则调用shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法是判断这个节点是否需要被挂起,介绍这个方法回顾下之前的,我们知道Node节点中,出了当前线程,前驱节点,后驱节点,还有个重要的变量:waitStatus, 这个变量是来表示当前node是否还需要竞争锁,某些场景下,有些线程是会放弃锁的竞争的,如Condition类的操作。
只有当node的状态为SIGNAL状态,当前节点才能被挂起。假设当前节点的waitStatus为SIGNAL,则调用parkAndCheckInterrupt方法,此时线程被真正的挂起。
至此,线程2进入FOFI队列,成功被挂起的过程就完成了。
这里有个很巧妙的设计,acquireQueued方法,在线程被挂起后,还是处在for循环中,所以当线程被唤醒时,会继续执行,此时执行tryAcquire成功后,获取锁,之后将节点删除即p.next = null; 这样获取锁的Node在队列里就没有了
接下来我们来看看unlock方法。
unlock方法其实最终调用的是AQS的release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用内部类Sync的tryRelease方法,尝试释放锁:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
releases=1,后驱state的值,减1,只有当c=0的时候才能释放所有锁,之前提到过,ReentrandLock是重入锁,每次lock一下,state都会加1,所以需要对应数量的unlock才能完全释放锁,释放锁后,将当前持有锁的变量设为null,即setExclusiveOwnerThread(null);
如果解锁成功,则继续执行unparkSuccessor方法,唤醒线程:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这里要注意下,如果node的下个节点为空,或者waitStatus>0(没有可唤醒的线程),则从队为倒序查询waitStatus<=0的节点,最终执行的是LockSupport.unpark(s.thread);
至此释放锁的过程也完成了。