以我第一次读源码的顺序。
创建锁:
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//主要逻辑都是基于这个大名鼎鼎的AQS
abstract static class Sync extends AbstractQueuedSynchronizer {}
加锁
public void lock() {
sync.lock();
}
非公平锁:
final void lock() {
// 直接通过cas获取锁,state == 0锁可用
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
//如果没有拿到则
else
acquire(1);
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
public final void acquire(int arg) {
// addWaiter就是把当前线程封装成node,添加到等待队列的尾端
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//这里没太想明白,什么状态才会需要自我中断
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0,没有人占用锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 持有锁的线程可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果符合park的条件,其实就是前序node状态为-1,则进入parkAndCheckInterrupt()函数,其中调用LockSupport.park()实现线程的阻塞;否则就尝试赋值前序状态至-1,然后再次尝试获取锁,如果没拿到则进入阻塞。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// ws > 0 -> ws = CANCEL,把cancel节点从队列中移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前序的状态为0或-2,-3,则尝试将前序变为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//这里没太想明白,当前线程什么时候会设置interrupt呢?
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
获取锁的逻辑都是在AQS里,这里就需要看一下AQS的结构:维护了一个双向链表,用于存放等待锁的线程node,包括head,tail。
sync的状态status,0表示空闲,>0有占用。
还有这几个offset变量,存储的是通过unsafe.objectFieldOffset方法获取到的各变量的内存偏移地址。
AQS继承AOS,AOS有个重要的变量exclusiveOwnerThread,存储的就是当前持有锁的线程。
waitStatus包括如下几种状态:
int CANCELLED = 1 //节点从同步队列中取消
int SIGNAL = -1 //后继线程处于等待状态,如果当前节点释放同步状态会通知后继线程,使其能够运行
int CONDITION = -2//当前节点进入等待队列中
int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
int INITIAL = 0;//初始状态
公平锁:
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁相对于非公平锁,就多了一个逻辑hasQueuedPredecessors()。
对于非公平锁,当某个线程尝试获取锁的时候,如果正好碰到前一个线程释放了锁,通过cas拿到锁,那就可以了。而对于公平锁,是按照AQS队列的先后顺序来获取锁的。
释放锁
公平和非公平锁的释放逻辑是一致的
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 如果释放成功后state==0,则return true
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//倒叙寻找离head最近的可唤醒线程
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}