在java.util.concurrent.locks包中有很多Lock的实现类, 常用的有ReentrantLock, ReadWriteLock(实现类ReentrantReadWriteLock). 这些锁的实现思路都大同小异, 都依赖java.util.concurrent.AbstractQueuedSynchronizer类, 下面分析最常用的ReentrantLock的实现.
ReentrantLock
ReentrantLock是一种可重入锁(若是在同一个线程上请求锁,则允许获得锁).
以下是官方说明:一个可重入的互斥锁定 Lock, 它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义, 但功能更强大. ReentrantLock将由最近成功获得锁定, 并且还没有释放该锁定的线程所拥有. 当锁定没有被另一个线程所拥有时, 调用 lock 的线程将成功获取该锁定并返回. 如果当前线程已经拥有该锁定, 此方法将立即返回true. 可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生.
本文主要分析以下几个过程:
ReentrantLock的创建(公平锁/非公平锁)
上锁: lock()
解锁: unlock()
ReentrantLock的内部主要结构如下:
ReentrantLock-->Lock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
注意: 边这四条线, 对应关系: "子类"-->"父类". 其中, NonfairSync, FairSync和Sync时ReentrantLock的三个内部类.
ReentrantLock的创建
ReentrantLock支持两种锁, 公平锁和非公平锁, 公平锁严格遵循(先进来的线程先执行), 非公平锁总体原则上鱼公平锁一样, 但在某些情况下, 有可能出现后进来的线程获得锁的情况(所谓来得早不如来得巧).
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默认创建非公平锁, 只有当传入参数true时, 才会创建公平锁.
上述源代码中出现了三个内部类Sync/NonfairSync/FairSync, 下面是三个类的定义.
/**
* 该锁同步控制的一个基类.下边有两个子类:非公平机制和公平机制.使用了AbstractQueuedSynchronizer类的
*/
static abstract class Sync extends AbstractQueuedSynchronizer
/**
* 非公平锁同步器
*/
final static class NonfairSync extends Sync
/**
* 公平锁同步器
*/
final static class FairSync extends Sync
lock
ReentrantLock:lock()
public void lock() {
sync.lock();
}
也就是说有内部的同步器来决定.
非公平锁的lock
NonfairSync: lock()
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
主要流程如下:
1)首先基于CAS将state(锁数量)从0设置为1, 如果设置成功, 设置当前线程为独占锁的线程;(此处就是非公平锁的一次插队).
2)如果设置失败(即当前的锁数量可能已经为1了), 这个时候当前线程执行acquire(1)方法;
2.1)acquire(1)方法首先调用下边的tryAcquire(1)方法(获取锁数量), 如果获取成功, 则返回.(此处就是非公平锁的第二次尝试插队). 如果失败, 继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法.
2.2)在执行acquireQueued方法之前,会先执行addWaiter方法,将当前线程加入到等待获取锁的队列的尾部.
2.2.1)addWaiter(Node.EXCLUSIVE)将当前线程封装进Node节点node,然后将该节点加入等待队列(先快速入队, 如果快速入队不成功, 其使用正常入队方法无限循环一直到Node节点入队为止).
2.2.1.1)快速入队: 如果同步等待队列存在尾节点, 将使用CAS尝试将尾节点设置为node, 并将之前的尾节点插入到node之前.
2.2.1.2)正常入队: 如果同步等待队列不存在尾节点或者上述CAS尝试不成功的话, 就执行正常入队(该方法是一个无限循环的过程, 即直到入队为止). 如果尾节点为空(初始化同步等待队列), 创建一个dummy节点, 并将该节点通过CAS尝试设置到头节点上去, 设置成功的话, 将尾节点也指向该dummy节点(即头节点和尾节点都指向该dummy节点). 如果尾节点不为空, 执行与快速入队相同的逻辑, 即使用CAS尝试将尾节点设置为node, 并将之前的尾节点插入到node之前.
2.2.2)node节点入队之后, 就去执行acquireQueued(final Node node, int arg)(此处会出现阻塞).
2.2.2.1)获取node的前驱节点p, 如果p是头节点, 就继续使用tryAcquire(1)方法去尝试请求成功. 如果第一次请求就成功, interrupted=false. 如果是之后的循环中将线程挂起之后被其他线程唤醒, interrupted=true. (注意p==head&&tryAcquire(1)成功是唯一跳出循环的方法).
2.2.2.2)如果p不是头节点, 或者tryAcquire(1)请求不成功,就去执行shouldParkAfterFailedAcquire(Node pred, Node node)来检测当前节点是不是可以安全的被挂起. 是, 则调用parkAndCheckInterrupt方法挂起当前线程. 否, 则继续执行2.2.2.1(这是一个死循环).
2.2.2.3)线程调用parkAndCheckInterrupt方法后会进入阻塞状态,等待其他线程唤醒. 唤醒后会将interrupted置为true, 并且继续执行2.2.2.1.
2.2.2.4)若在这个循环中产生了异常, 我们就会执行cancelAcquire(Node node)取消node的获取锁的意图.
涉及到的方法:
AbstractQueuedSynchronizer: compareAndSetState()
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AbstractQueuedSynchronizer: acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync: tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Sync: nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AbstractQueuedSynchronizer: addWaiter(Node mode)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
AbstractQueuedSynchronizer: acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平锁的lock
FairSync: lock()
final void lock() {
acquire(1);
}
AbstractQueuedSynchronizer: acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
主要流程比非公平锁简单很多:
1):如果锁数量为0, 如果当前线程是等待队列中的头节点, 基于CAS尝试将state(锁数量)从0设置为1一次, 如果设置成功, 设置当前线程为独占锁的线程;
2):如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了, 查看当前线程是不是已经是独占锁的线程了, 如果是, 则将当前的锁数量+1;如果不是, 则将该线程封装在一个Node内, 并加入到等待队列中去, 等待被其前一个线程节点唤醒.
相关函数如下.
FairSync: tryAcquire()
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AbstractQueuedSynchronizer: hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
其他函数域非公平锁的一样.
非公平锁的解锁
ReentrantLock:unlock()
public void unlock() {
sync.release(1);
}
主要流程:
1)获取当前的锁数量, 然后用这个锁数量减去解锁的数量(这里为1), 最后得出结果c.
2)判断当前线程是不是独占锁的线程, 如果不是, 抛出异常.
3)如果c==0, 说明锁被成功释放, 将当前的独占线程置为null, 锁数量置为0, 返回true.
4)如果c!=0, 说明释放锁失败, 锁数量置为c, 返回false.
5)如果锁被释放成功的话, 唤醒距离头节点最近的一个非取消的节点.
涉及的代码.
AbstractQueuedSynchronizer: release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Sync: tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
公平锁的解锁
与非公平锁一致