如下代码,当我们在使用 ReentrantLock 进行加锁和解锁时,底层到底是如何帮助我们进行控制的啦?
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
// 使用两个线程模拟多线程执行并发
new Thread(() -> doBusiness(), "Thread-1").start();
new Thread(() -> doBusiness(), "Thread-2").start();
}
private static void doBusiness() {
try {
lock.lock();
System.out.println("需要加锁的业务处理代码,防止并发异常");
} finally {
lock.unlock();
}
}
带着这样的疑问,我们先后跟进 lock()和unlock() 源码一探究竟
说明:
1、在进行查看 ReentrantLock 进行 lock() 加锁和 unlock() 解锁源码时,需要知道 LockSupport 类、了解自旋锁以及链表相关知识。
2、在分析过程中,假设第一个线程获取到锁的时候执行代码需要很长时间才释放锁,及在第二个第三个线程来获取锁的时候,第一个线程并没有执行完成,没有释放锁资源。
3、在分析过程中,我们假设第一个线程就是最先进来获取锁的线程,那么第二个第三个线程也是依次进入的,不会存在第三个线程先于第二个线程(即第三个线程如果先于第二个线程发生,那么第三个线程就是我们下面描述的第二个线程)
一、 lock() 方法
1、查看lock()方法源码
public void lock() {
sync.lock();
}
从上面可以看出 ReentrantLock 的 lock() 方法调用的是 sync 这个对象的 lock() 方法,而 Sync 就是一个实现了抽象类AQS(AbstractQueuedSynchronizer) 抽象队列同步器的一个子类 ,继续跟进代码(说明:ReentrantLock 分为公平锁和非公平锁,如果无参构造器创建锁默认是非公平锁,我们按照非公平锁的代码来讲解)
1.1 关于Sync子类的源码
abstract static class Sync extends AbstractQueuedSynchronizer {
// 此处省略具体实现AbstractQueuedSynchronizer 类的多个方法
}
这里需要说明的是 AbstractQueuedSynchronizer 抽象队列同步器底层是一个通过Node实现的双向链表,该抽象同步器有三个属性 head 头节点 , tail 尾节点 和 state 状态值。
属性1:head——注释英文翻译:等待队列的头部,懒加载,用于初始化,当调用 setHead() 方法的时候会对 head 进行修改。注:如果 head 节点存在,则 head 节点的 waitStatus 状态值用于保证其不变成 CANCELLED(取消,值为1) 状态
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
属性2: tail——tail节点是等待队列的尾部,懒加载,在调用 enq() 方法添加一个新的 node 到等待队列的时候会修改 tail 节点。
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
属性3:state——用于同步的状态码。 如果 state 该值为0,则表示没有其他线程获取到锁,如果该值大于1则表示已经被某线程获取到了锁,该值可以是2、3、4,用该值来处理重入锁(递归锁)的逻辑。
/**
* The synchronization state.
*/
private volatile int state;
1.2 上面 Sync 类使用 Node来作为双向队列的具体保存值和状态的载体,Node 的具体结构如下
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); // 共享锁模式(主要用于读写锁中的读锁)
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; // 排他锁模式(也叫互斥锁)
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1; // 线程等待取消,不再参与锁竞争
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1; // 表明线程需要被唤醒,可以竞争锁
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; // 表明线程在某种情况下等待
/** waitStatus value to indicate the next acquireShared should unconditionally propagate */
static final int PROPAGATE = -3;
volatile int waitStatus; // 默认等待状态为0
volatile Node prev;
volatile Node next;
/** The thread that enqueued this node. Initialized on construction and nulled out after use.*/
volatile Thread thread; // 当前线程和节点进行绑定,通过构造器初始化Thread,在使用的时候将当前线程替换原有的null值
// 省略部分代码
}
说明: Sync 通过Node节点构建队列,Node节点使用prev和next节点来行程双向队列,使用prev来关联上一个节点,使用next来关联下一个节点,每一个node节点和一个thread线程进行绑定,用来表示当前线程在阻塞队列中的具体位置和状态 waitStatus 。
2、上面的 sync.lock() 继续跟进源码(非公平锁):
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
说明:上面代码说明,如果 compareAndSetState(0, 1) 为 true ,则执行 setExclusiveOwnerThread(Thread.currentThread()) ,否则执行 acquire(1);
2.1 compareAndSetState(0, 1) 底层使用unsafe类完成CAS操作 ,意思就是判断当前state状态是否为0,如果为零则将该值修改为1,并返回true;state不为0,则无法将该值修改为1,返回false。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2 假如 第1个线程 进来的时候 compareAndSetState(0, 1) 肯定执行成功,state 状态会从0变成1,同时返回true,执行 setExclusiveOwnerThread(Thread.currentThread()) 方法:
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
setExclusiveOwnerThread(Thread.currentThread()) 表示将当前 Sync 对象和当前线程绑定,意思是表明:当前对内同步器执行的线程为 thread,该 thread 获取了锁正在执行。
2.3 假如 进来的线程为第2个 ,并且第一个线程还在执行没有释放锁,那么第2个线程就会执行 acquire(1) 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
进入到该方法中发现,需要通过 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 两个方法判断是否需要执行 selfInterrupt();
(1)先执行 tryAcquire(arg) 这个方法进行判断
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取state状态,因为第一个线程进来的时候只要还没有执行完就已经将state设置为1了(即:2.1步)
int c = getState();
// 再次判断之前获取锁的线程是否已经释放锁了
if (c == 0) {
// 如果之前的线程已经释放锁,那么当前线程进来就将状态改为1,并且设置当前占用锁的线程为自身
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前占用锁的线程是不是就是我自身,如果是我自身,这将State在原值的基础上进行加1,来处理重入锁逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 判断重入锁次数是不是超过限制,超过限制则直接报错
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从上面的方法看, 如果第二个线程进来,且第一个线程还未释放锁的情况下,该方法 tryAcquire(arg) 直接放回false,那么 !tryAcquire(arg) 就为true,需要判断第二个方法 acquireQueued( addWaiter(Node.EXCLUSIVE) , arg),第二个方法先执行addWaiter(Node.EXCLUSIVE),及添加等待线程进入队列
(2)添加等待线程到同步阻塞队列中
private Node addWaiter(Node mode) {
// 将当前线程和node节点进行绑定,设置模式为排他锁模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;// 第二个线程也就是第一次进来该方法的线程,tail肯定是null
if (pred != null) { // 如果tail尾节点不为空,表示第3、4、5次进来的线程
node.prev = pred; // 那么就将当前进来的线程节点的 prev 节点指向之前的尾节点
if (compareAndSetTail(pred, node)) { // 通过比较并交换,如果当前尾节点在设置过程中没有被其他线程抢先操作,那么就将当前节点设置为tail尾节点
pred.next = node; // 将以前尾节点的下一个节点指向当前节点(新的尾节点)
return node;
}
}
enq(node); // 如果为第二个线程进来,就是上面的 pred != null 成立没有执行,直接执行enq()方法
return node;
}
private Node enq(final Node node) {
for (;;) { // 一直循环检查,相当于自旋锁
Node t = tail;
if (t == null) { // Must initialize
// 第二个线程的第一次进来肯定先循环进入该方法,这时设置头结点,该头结点一般被称为哨兵节点,并且头和尾都指向该节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 1、第二个线程在第二次循环时将进入else 方法中,将该节点挂在哨兵节点(头结点)后,并且尾节点指向该节点,并且将该节点返回(该节点有prev信息)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如上在执行 enq(final Node node) 结束,并且返回添加了第二个线程node节点的时候, addWaiter(Node mode) 方法会继续向上返回
或者 : 如果是添加第3、4个线程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回 都将,到了 2.3 步,执行 acquireQueued(final Node node, int arg) ,再次贴源码
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(3)即下一步就会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:
注: 上面的流程是将后面的线程加入到了同步阻塞队列中,下面的方法第一个 if (p == head && tryAcquire(arg))则是看同步阻塞队列的第一条阻塞线程是否可以获取到锁,如果能够获取到锁就修改相应链表结构,第二个if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 即将发生线程阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 自旋锁,如果为第二个线程,那么 p 就是 head 哨兵节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 上面的 if 表明如果当前线程为同步阻塞队列中的第一个线程,那么就再次试图获取锁 tryAcquire(),如果获取成功,则修改同步阻塞队列
setHead(node); // 将head头结点(哨兵节点)设置为已经获取锁的线程node,并将该node的Theread 设置为空
p.next = null; // help GC 取消和之前哨兵节点的关联,便于垃圾回收器对之前数据的回收
failed = false;
return interrupted;
}
// 如果第二个线程没有获取到锁(同步阻塞队列中的第一个线程),那么就需要执行下面两个方法,注标蓝的方法会让当前未获取到锁的线程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
// 将哨兵节点往后移,并且将 thread 设置为空,取消和以前哨兵节点的关联,并于垃圾回收器回收
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire(p, node)这个方法将哨兵队列的状态设置为待唤醒状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// pred为哨兵节点,ws为哨兵节点的状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将头结点(哨兵节点)设置成待唤醒状态,第一次进来的时候
}
return false;
}
parkAndCheckInterrupt()这个方法会让当前线程阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // LockSupport.park()会导致当前线程阻塞,直到某个线程调用unpark()方法
return Thread.interrupted();
}
那么在lock()方法执行时,只要第一个线程没有unlock()释放锁,其他所有线程都会加入同步阻塞队列中,该队列中记录了阻塞线程的顺序,在加入同步阻塞队列前有多次机会可以抢先执行(非公平锁),如果没有被执行到,那么加入同步阻塞队列后,就只有头部节点(哨兵节点)后的阻塞线程有机会获取到锁进行逻辑处理。再次查看该方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// if 表明只有头部节点(哨兵节点)后的节点在放入同步阻塞队列前可以获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 所有线程都被阻塞在这个方法处
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二、unlock()方法
1、unlock源码
public void unlock() {
sync.release(1);
}
同样是调用的同步阻塞队列的方法 sync.release(1),跟进查看源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2、查看tryRelease()方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果不是自身锁对象调用unlock()方法的话,就报异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 如果标志位已经为0,表示重入锁已经全部释放,这将当前获取锁的线程设置为null,以便其他线程进行加锁
setExclusiveOwnerThread(null);
}
// 更新重入锁解锁到达的次数,如果C不为0,表示还有重入锁unlock()没有调用完
setState(c);
return free;
}
3、如果tryRelease()方法成功执行,表示之前获取锁的线程已经执行完所有需要同步的代码(重入锁也完全退出),那么就需要唤醒同步阻塞队列中的第一个等待的线程(也是等待最久的线程),执行unparkSuccessor(h)方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 先获取头结点(哨兵节点)的waitStatus状态,如果小于0,则可以获取锁,并将waitStatus的状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果哨兵节点的下一个节点为null,或者状态为1表示已经取消,则依次循环寻找后面节点,直至找到一个waitStatus<0的节点,并将该节点设置为需要获取锁的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 将该node节点的线程解锁,允许它去获取锁,然后执行业务逻辑
LockSupport.unpark(s.thread);
}
三、unlock()方法调用后,会到lock()方法阻塞的地方,完成唤醒工作
1、在上面方法 unparkSuccessor(Node node) 中执行完 LockSupport.unpark(s.thread) 后在同步阻塞队列后的第一个 node 关联的线程将被唤醒,即unlock()方法代码执行完,将会到lock() 源码解析的 2.3 步里,第三次贴该处源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2、在上面放大的红色方法中,之前上面lock()源码讲了当中所有线程都被阻塞了,如下面源码红色标记的地方:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
3、所有未获取到锁的线程都在 parkAndCheckInterrupt() 方法处阻塞着,所以我们即将唤醒的哨兵节点后的第一个阻塞线程也是在该处阻塞着,在执行完 unlock() 源码步骤第3步 unparkSuccessor(Node node) 中的方法,则将 返回到 之前阻塞线程的这个方法 parkAndCheckInterrupt()的这行代码 LockSupport.park( this ) 的下一步执行 Thread.interrupted(),唤醒当前线程。最后一步一步跟源码往上返回 true ,就又到了这个方法acquire(int arg),第四次贴源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 上面红色的两个方法都为 true,执行自我唤醒
selfInterrupt();
}
4、最终到了唤醒方法 selfInterrupt()
static void selfInterrupt() {
// 最终底层会调用 c++的唤醒方法
Thread.currentThread().interrupt();
}
总结:
在第一个 A 线程 lock() 获取到锁后,第一个线程会在底层的同步阻塞队列中设置锁状态 state 为1(如果重入锁多次获取 state 每次加1),并设置拥有当前锁的线程为自身A线程,其他线程 B/C/D 来获取锁的时候就会比较锁状态是否为0,如果不为0,表示已经被获取了锁,再次比较获取锁的线程是否为自身,如果为自身则对 state 加1(满足重入锁的规则),否则这将当前未获取到锁的线程放入同步阻塞队列中,在放入的过程中,需要设置 head 哨兵节点和 tail 尾节点,以及相应的 waitStatus 状态,并且在放入过程中需要设置当前节点以及先关节点的 prev 和 next 节点,从而达到双向队列的效果,存放到阻塞队列后,线程会被阻塞到这样一个方法中 parkAndCheckInterrupt() ,等待被唤醒。
在第一个 A 线程执行完毕,调用 unlock() 解锁后,unlock() 方法会从同步阻塞队列的哨兵节点后的第一个节点获取等待解锁的线程B,并将其解锁,然后就会到B阻塞的方法 parkAndCheckInterrupt() 来继续执行,调用 selfInterrupt()方法,最终调用底层的 c语言的唤醒方法,使得 B 线程完成 lock() 方法获取到锁,然后执行业务逻辑。其他线程以此类推,依次发生阻塞和唤醒。