线程互斥同步除了使用最基本的 synchronized 关键字外(关于 synchronized 关键字的实现原理,请看之前写的线程安全之 synchronized 关键字), Java 5 之后还提供了 API 可以实现同样的功能,java.util.concurrent(简称 J.U.C)下的重入锁 ReentrantLock 不仅实现可重入的互斥锁,还有几个高级功能:等待可中断、可实现公平锁、锁可绑定多个条件、可限定最大等待时间。下面从基本使用到内部实现,层层分析 ReentrantLock 原理。
1. ReentrantLock 的用法
ReentrantLock 文档中写明了在 lock()
方法后,用 try 把同步代码块包起来,然后在 finally 中调用 unlock()
。这样做的目的是保证解锁操作一定会被调用,防止死锁。
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
// block until condition holds
lock.lock();
try {
// ... method body
} finally {
lock.unlock();
}
}
}
ReentrantLock 还可以绑定多个条件,下面使用 Condition 文档中的例子来说明:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// notFull 是 buffer 没有到最大值的条件
final Condition notFull = lock.newCondition();
// notEmpty 是 buffer 不为空的条件
final Condition notEmpty = lock.newCondition();
// buffer 最大值为 100
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
// buffer 满了就挂起,直到收到 notFull 的信号
notFull.await();
items[putptr] = x;
if (++ putptr == items.length) putptr = 0;
++ count;
// buffer 新增 item,发送 notEmpty 信号
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
// buffer 为空就挂起,直到收到 notEmpty 的信号
notEmpty.await();
Object x = items[takeptr];
if (++ takeptr == items.length) takeptr = 0;
-- count;
// buffer 取走 item,发送 notFull 信号
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
2. ReentrantLock 的 API
ReentrantLock 实现了 Lock 和 Serializable 接口,下面是它的一些关键 API。
ReentrantLock() -- 默认使用非公平锁
ReentrantLock(boolean fair) -- 是否使用公平锁
void lock() -- 获取锁,如果锁被其他线程持有,则阻塞该线程
void lockInterruptibly() -- 获取锁,如果锁被其他线程持有,则阻塞该线程,直到获取锁或被其他线程中断;如果获取锁之前或者在获取过程的过程中线程中断,则抛出中断异常
boolean tryLock() -- 如果直接获取锁成功则返回 true;如果锁被其他线程持有,返回 false
boolean tryLock(long timeout, TimeUnit unit) -- 在等待时间内获取到锁并且线程没有被中断,返回 true;否则返回 false
void unlock() -- 释放锁,如果该线程没有持有锁,则抛出异常
Condition newCondition() -- 返回一个与锁关联的 Condition 实例
boolean isHeldByCurrentThread() -- 当前线程是否持有锁
boolean isLocked() -- 锁是否被任意线程持有
3. ReentrantLock 的内部实现
先总体描述下 ReentrantLock 的大致实现,有一个成员属性 sync
,所有的方法都是调用该属性的方法。Sync
继承 AbstractQueuedSynchronizer
(简称 AQS),AQS 封装了锁和线程等待队列的基本实现。Sync
有两个子类 NonfairSync
和 FairSync
,分别对应非公平锁和公平锁。AQS 内部使用volatile int state
表示同步状态,在 ReentrantLock 中 state
表示占有线程对锁的持有数量,为 0 表示锁未被持有,为 1 表示锁被某个线程持有,> 1 表示锁被某个线程持有多次(即重入)。
3.1 默认非公平锁的 lock()
非公平锁的 lock() 的方法路线如下:
lock() -> NonfairSync.lock() -> AQS.compareAndSetState(0, 1)
-> AQS.acquire(1) -> NonfairSync.tryAcquire(1) -> Sync.nonfairTryAcquire(1)
-> AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), 1)
下面一步步分析源码:
final void NonfairSync.lock() {
// 锁未被持有,则获取锁,并将当前线程设置为锁的独占线程
// 这里可能为其他线程刚刚释放锁,还有其他线程在等待,但这时直接获取,所以是不公平的
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 若锁被持有,则调用 AQS.acquire(1) 方法
else
acquire(1);
}
protected final boolean AQS.compareAndSetState(int expect, int update) {
// 利用 sun.misc.Unsafe 的 CAS 原子操作
// 如果 state 的当前值为 expect,则修改为 update,返回 true
// 如果 state 的当前值不为 expect,返回 false
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void AQS.acquire(int arg) {
// NonfairSync.tryAcquire(1) 方法只是调用了 Sync.nonfairTryAcquire(1)
// 先尝试获取锁
if (!tryAcquire(arg) &&
// 获取失败则把线程添加到等待队列中,并阻塞该线程直到获取成功
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean Sync.nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果锁未被持有,则直接获取
// 这里可能为其他线程刚刚释放锁,还有其他线程在等待,但这时直接获取,所以是不公平的
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 锁被当前线程持有,属于重入,state ++
int nextc = c + acquires;
// 如果 state > 2 ^ 31 - 1, 则抛出异常,这也是最大重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以非公平锁的 lock() 的大致逻辑为:如果锁未被持有,不管等待队列中的线程直接获取;如果锁被自己(当前线程)持有,则把 state
加 1;否则将当前线程加入到等待队列中,并阻塞该线程直到获取成功。
关于AQS.acquireQueued()
的内部实现在下一篇文章中专门分析 AQS 的内部原理,阻塞线程是调用LockSupport.park()
方法实现的。
LockSupport.park() 与线程中断的关系
使用 Object.wait() 阻塞线程后,中断阻塞线程会唤醒它并且清除中断状态然后抛出 InterruptedException。而 LockSupport.park() 阻塞线程后,线程中断只会唤醒被阻塞的线程,没有其他行为,和 unpark() 行为一致,所以需要判断 Thread.interrupted()
来确定是否由中断唤醒的。
3.2 公平锁的 lock()
公平锁的 lock() 方法路线如下:
lock() -> FairSync.lock() -> AQS.acquire(1) -> FairSync.tryAcquire(1)
-> AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), 1)
公平锁与非公平锁的主要区别在于 FairSync.tryAcquire(1) 这一步:
protected final boolean FairSync.tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果锁未被持有,并且当前线程在等待队列的头部或者等待队列为空,则获取锁
// 保证了没有线程等待时间超过当前线程,所以是公平的
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 锁被当前线程持有,属于重入,state ++
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以公平锁的 lock() 的大致逻辑为:如果锁未被持有,并且当前线程在等待队列的头部或者等待队列为空,则获取锁;如果锁被自己(当前线程)持有,则把 state
加 1;否则将当前线程加入到等待队列中,并阻塞该线程。
3.3 可中断的 lockInterruptibly()
lockInterruptibly() 方法的文档介绍是获取锁除非线程中断,首先看它的方法路线:
lockInterruptibly() -> AQS.acquireInterruptibly(1) -> throw new InterruptedException()
-> NonfairSync.tryAcquire(1) or FairSync.tryAcquire(1)
-> AQS.doAcquireInterruptibly(1)
下面看 acquireInterruptibly() 的源码:
public final void AQS.acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程是中断的,抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,即如果锁未被持有或者已被当前线程持有,直接获取
if (!tryAcquire(arg))
// 获取锁失败,把线程添加到等待队列,阻塞线程,直到获取成功或者线程中断,线程中断也会抛出 InterruptedException
doAcquireInterruptibly(arg);
}
从上面实现,可以发现 lockInterruptibly() 与 lock() 的主要区别有两点:(1)如果此时线程是中断的,那么直接抛出 InterruptedException 异常;(2)如果线程被阻塞,在等待过程中线程中断,抛出 InterruptedException 并取消获取,从等待队列中删除。该方法可以用线程中断防止长时间阻塞,也可以以此退出死锁。
3.4 非公平的 tryLock()
不管是公平锁或者非公平锁,tryLock() 方法都是使用非公平策略来尝试获取锁,看它的路线图:
tryLock() -> Sync.nonfairTryAcquire(1)
Sync.nonfairTryAcquire(1) 方法在默认非公平锁的 lock() 中分析过了,如果锁未被其他线程持有(两种情况:1. 未被持有 2. 被自己持有),则获取锁并返回 true,否则返回 false。tryLock() 方法只是尝试获取锁,获取失败就会返回不会阻塞线程,而使用 synchronized 关键字则会阻塞直到获取锁。
3.5 在限定时间内的 tryLock(long timeout, TimeUnit unit)
先从方法实现看看与 tryLock() 的区别:
tryLock(long timeout, TimeUnit unit) -> AQS.tryAcquireNanos(1, unit.toNanos(timeout)) -> throw new InterruptedException()
-> NonfairSync.tryAcquire(1) or FairSync.tryAcquire(1)
-> AQS.doAcquireNanos(1, nanosTimeout)
AQS 的 tryAcquireNanos 方法源码如下:
public final boolean AQS.tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果当前线程是中断的,抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,即如果锁未被持有或者已被当前线程持有,直接获取
return tryAcquire(arg) ||
// 获取失败,把线程添加到等待队列,阻塞线程,直到限定时间、线程中断或者在此之前获取成功,线程中断也会抛出 InterruptedException
doAcquireNanos(arg, nanosTimeout);
}
可以看出 AQS.tryAcquireNanos(arg, nanosTimeout) 方法与 AQS.acquireInterruptibly(arg) 类似,都支持线程中断,还加上了一个限定时间。如果限定时间为 0,那么就相当于调用 tryAcquire(1) 方法。上面的 tryLock() 方法在公平锁中还是使用非公平策略,但是 tryLock(0, TimeUnit.SECONDS) 在公平锁中可以实现公平的 tryLock() 方法。
3.6 unlock()
unlock() 释放持有的锁,从获取锁的过程可以猜测到其中肯定会将 state 减 1,但是具体的方法路线是如何呢?
unlock() -> AQS.release(1) -> Sync.tryRelease(1)
-> AQS.unparkSuccessor(head)
下面具体源码:
public final boolean AQS.release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放成功,并等待队列的第一个节点不为空,使用 LockSupport.unpark() 唤醒第一个节点的线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean Sync.tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 当前线程没有持有锁,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// state 为 0,锁才是自由的,否则只是退出一次重入,锁的被持有线程不变
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
// 返回释放后锁是否自由,即未被持有
return free;
}
所以 unlock() 方法实际是将 state 减 1,之后如果锁是自由的,则会唤起等待队列的头节点中的线程。不过在两者中间,如果有其他线程获取锁的话,公平锁会判断是否有线程等待,而非公平锁则直接获取该锁。
3.7 isHeldByCurrentThread() 与 isLocked()
这两个方法就比较简单了,直接看对应的源码:
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
protected final boolean Sync.isHeldExclusively() {
// 判断锁的被持有线程是否为当前线程
return getExclusiveOwnerThread() == Thread.currentThread();
}
public boolean isLocked() {
return sync.isLocked();
}
final boolean Sync.isLocked() {
return getState() != 0;
}
newCondition() 方法会在下面单独描述,而其他方法不是很重要,这里就不再分析了。
4. Condition
Condition 的作用与 Object 的 wait、notify、notifyAll 类似,用以线程间协作。调用 Condition.await() 或 Object.wait() 将阻塞线程等待其他线程的通知,调用 Conditon.signal()、Condition.signalAll()、Object.nofity()、Object.notifyAll() 将唤起 wait 的线程。
下面有几个相关疑问,可以仔细琢磨下。
为什么 Condition 与锁相关,Object 的 wait、notify、notifyAll 与对象相关
先思考为什么 wait、notify、notifyAll 是 Object 的方法,如果它们不和对象相关联,wait() 阻塞线程后,notify() 唤起线程时不知道究竟唤醒哪些 wait 的线程,所以与某一对象对应可以帮助 notify() 时唤醒的也是与该对象相关的等待线程。
为什么 await、signal 方法需要先获取锁,wait、notify 方法需要先获取对象锁
这样做的好处是保证 wait 和 notify 的过程是互斥的,而它们又要与某一个东西相关联,所以直接的方法与对象锁相关联,实际不是与对象相关。所以 Condition 和 lock 相关联。
4.1 Condition 的 API
await() -- 释放相关的锁,然后阻塞当前线程直到被 singal 通知或者线程中断
awaitUninterruptibly() -- 释放相关的锁,阻塞当前线程直到被 singal 通知
awaitNanos(long nanosTimeout)、await(long time, TimeUnit unit)、awaitUntil(Date deadline) -- 释放相关的锁,阻塞当前线程直到被 singal 通知、线程中断或限定时间到
signal() -- 唤醒一个等待的线程,被唤醒的线程返回 await() 方法前需要重新获取锁
singalAll() -- 唤醒所有等待的线程,所有被唤醒的线程返回 await() 方法前需要重新获取锁
4.2 ReentrantLock 的 Condition 的内部实现
下面看 await()、signal() 两个方法的实现细节,ReentrantLock 返回的 Condition 是 AQS.ConditionObject 实例。
public final void AQS.ConditionObject.await() throws InterruptedException {
// 如果线程中断,直接抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 先把当前线程添加到 condition 的等待队列中
Node node = addConditionWaiter();
// 释放线程当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断线程是否被通知想重新获取锁
while (!isOnSyncQueue(node)) {
// 阻塞线程
LockSupport.park(this);
// 阻塞线程被唤醒后,如果此时线程中断,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 所以阻塞线程被 signal 唤醒后,或者线程中断后可以跳出循环
// 重新获取锁,获取失败则阻塞加入阻塞队列直到获取成功
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 如果获取的过程中线程中断,设置 interruptMode 为 REINTERRUPT
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清楚等待队列中的取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 如果 interruptMode 为 REINTERRUPT, 再次中断线程
// 如果 interruptMode 为 THROW_IE,抛出 InterruptedException
reportInterruptAfterWait(interruptMode);
}
所以 awit() 的大致逻辑为:释放锁,并且阻塞自己并添加到 condition 的等待队列,被 signal 通知或线程中断后唤醒线程,重新获取锁。
下面再看 signal() 方法的实现:
public final void AQS.ConditionObject.signal() {
// 锁不是互斥独占锁时,抛出 IllegalMonitorStateException 异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 如果等待队列不为空,
doSignal(first);
}
private void AQS.ConditionObject.doSignal(Node first) {
do {
// 把 first 节点从队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 循环找到第一个未取消的节点,把该节点从 condition 队列添加到 sync 等待队列(lock 队列)
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
可以看到 signal() 并没有唤起 wait 的线程,只是把等待时间最长的未取消线程添加到 sync 等待队列,等待获取锁。
而 signalAll() 方法的区别时将 condition 等待队列中所有节点移到 sync 等待队列。
现在再来分析下,一开始提供的 Condition 的 BoundedBuffer 示例,假设现在 BoundedBuffer 中 items 为空:
public Object take() throws InterruptedException {
lock.lock();
// 此时,线程 A 获取到 lock
try {
while (count == 0)
// 因为 buffer 为空,释放获取的 lock,阻塞线程,添加到 notEmpty 等待队列
notEmpty.await();
Object x = items[takeptr];
if (++ takeptr == items.length) takeptr = 0;
-- count;
// buffer 取走 item,发送 notFull 信号
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
public void put(Object x) throws InterruptedException {
lock.lock();
// 然后,线程 B 获取到 lock
try {
while (count == items.length)
// buffer 满了就挂起,直到收到 notFull 的信号
notFull.await();
items[putptr] = x;
if (++ putptr == items.length) putptr = 0;
++ count;
// 把 notEmpty 等待队列中的线程 A 移到 lock 的等待队列
notEmpty.signal();
} finally {
// 线程 B 释放锁,唤醒 lock 等待队列中的线程 A,线程 A 获取到 lock 然后从 await() 方法返回
lock.unlock();
}
}
5. 总结
ReentrantLock 是 API 的重入锁,相对 synchronized 关键字来说,额外支持公平锁(synchronized 是非公平的)、获取锁可中断、可以限定获取的最大时间、可以关联多个 Condition。内部主要实现细节是基于 AQS 的,等待队列是用链表结构存储的,阻塞队列使用 LockSupport.park() 实现。
什么时候用 ReentrantLock?
JDK 1.6 之后,synchronized 的性能优化得和 ReentrantLock 差不多,所以在 synchronized 可以满足条件的情况话,优先使用 synchronized。