前言
最近在学习java多线程的相关知识,把相关的源码大概过了一下,看完了虽然好像觉得没什么卵用,但是面试问得多,加上最近打算开始写写博客,所以还是打算记录一下,同时也总结一下学过的这些东西。能力不足水平有限,以下内容不一定完全正确。
概述
AbstractQueuedSynchronizer是java.util.concurrent包下的大部分类实现的基础,如ReentrantLock,ReadWriteLock,CountDownLatch,Semaphore等都是基于这个类实现的。下面主要讲解一下ReentrantLock和AbstractQueuedSynchronizer的源码。
在这之前首先还需要了解一下CAS(CompareAndSwap),这是一个底层实现的原子操作,主要有三个值,内存值A、预期值B和修改值C,在执行CAS操作的时候,会先判断内存值A和预期值B是否相等,如果相等则修改内存值A为修改值C,否则什么都不做。
AbstractQueuedSynchronizer相关的数据结构
head,tail
AbstractQueuedSynchronizer维护一个线程等待对列(chl对列,实际就是一个FIFO的对列),head是该对列的头结点,tail是该对列的尾节点。注意,通常我们说的等待对列或者阻塞对列一般不包括上图的头节点在内,一般来说头节点要么是空节点,要么是当前已经申请到锁的线程的节点。
state
state是AbstractQueuedSynchronizer最重要的一个属性,代表当前的同步的状态,state=0的时候锁是处于没被占用的状态,state>0表明已经有线程占有锁。
Node
AbstractQueuedSynchronizer的对列是一个链表结构的对列,对列中的每一个节点被封装成一个Node,Node部分源码如:
//节点处于共享模式下
static final Node SHARED = new Node();
//节点处于独占模式下
static final Node EXCLUSIVE = null;
//当前节点的线程因超时或者中断被取消(进入该状态后不会再改变)
static final int CANCELLED = 1;
//当前节点的后继节点需要被唤醒。当前节点取消或者被中断时需要唤醒后继节点
//一个节点进入等待对列后需要将前驱节点状态设置为SIGNAL
static final int SIGNAL = -1;
//节点的线程处于休眠状态,等待Condition唤醒
static final int CONDITION = -2;
//共享模式下使用的一种状态(代码没细看,不知道干嘛用的)
static final int PROPAGATE = -3;
//线程当前的状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前节点对应的线程
volatile Thread thread;
Node nextWaiter;//存储Condition的等待队列的后继节点
ReentrantLock 可重入锁
ReentrantLock的加锁释放锁的操作都是内部类Sync完成的,FairSync是公平锁的实现,NonfairSync是非公平锁的实现。
通常情况下,ReentrantLock在业务逻辑中都是这样的:
//创建一个ReentrantLock
ReentrantLock lock = new ReentrantLock();
//业务逻辑中
try{
lock.lock();
//业务逻辑访问临界资源
}finally{
lock.unlock();
}
//ReentrantLock默认使用非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync;
}
ReentrantLock允许我们对公平性进行设置。公平锁会保证按照请求锁的顺序为线程分配锁,不会产生饥饿现象。而非公平锁则不然,可能会导致某些线程一直处在阻塞状态。
下面以非公平锁为例讲述一下lock和unlock的细节
//NonfairSync的lock实现
final void lock() {
//cas操作:检查一下当前state状态,如果state=0,那么说明当前没有别的线
//程占有锁,可以直接将设置state=1,并设置当前锁已经被线程占用。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果上面cas操作的时候state!=0,那么开始执行下面的acquire操作
acquire(1);
}
//AbstractQueuedSynchronizer中的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AbstractQueuedSynchronizer中的acquire方法包含了加锁的整个流程,下面结合源码分别讲解一下这个流程。
首先会执行tryAcquire(1)。
//NonfairSync中的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//Sync中的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//获取当前锁的状态
//再次尝试直接获取锁,如果state=0即锁没被占用,那么直接将其分配给当前线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已经获得锁,那么state+1。因为ReentrantLock是可重入的
//即当前线程如果已经获得锁,那么下次可以直接再次获得锁。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//state同时也是当前线程加锁的次数
return true;
}
return false;
}
到这里,AbstractQueuedSynchronizer中的acquire方法中的tryAcquire(arg)这段代码已经执行完毕。如果当前锁没被占用(state=0)或者当前线程已经拥有这把锁,那么返回的是true,否则,返回false。
如果返回false,将会继续执行addWaiter(Node.EXCLUSIVE),将当前线程加入到等待对列中。
//AbstractQueuedSynchronizer中的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//这个方法主要就是将当前的线程封装成Node添加到等待队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//尝试直接将当前线程对应的node节点添加在对尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果上述步骤失败,进入enq(node);
enq(node);
return node;
}
//enq会不断的循环直到把当前线程添加到等待队列的尾部为止
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//初始化等待队列
//注意这里,初始化时头节点是空节点
if (compareAndSetHead(new Node())){
tail = head;
} else {
//将当前线程对应的节点添加到等待队列尾部,一直尝试,知道成功为止。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
执行完addWaiter,等待队列有两种情况,一,队列只有两个节点,head对应一个空的头节点,tail对应当前线程的节点。二,队列有多个节点,tail对应刚加入的当前线程的节点。
此时,已经将当前线程添加到等待队列中,并且addWaiter会方法返回当前线程对应的节点,接下来执行acquireQueued方法。
//AbstractQueuedSynchronizer中的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前线程对应的节点的前驱节点
final Node p = node.predecessor();
//如果是头节点,再次执行tryAcquire方法尝试获取锁,如果成功则将
//当前节点设为头节点。之前的enq()方法里提到头节点可能是空节
//点,所以如果当前节点的前驱节点是头节点的话是可以去尝试一下请求锁的。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果当前节点的前驱节点不是头节点,那么执行下面的shouldParkAfterFailedAcquire
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前驱节点已经是SIGNAL状态,那么当前节点可以放心的被挂起,
//之后再前驱节点释放锁的时候,当前节点会被唤醒
if (ws == Node.SIGNAL)
return true;
//前驱节点处于取消状态,不再争夺锁,那么一直往前找直到找到一个非
//取消状态的节点,然后将当前节点设为它的后继节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;//pred = pred.prev;node.prev = pred;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱节点不处于取消请求的状态,源码注释说ws在这里的取值
//有0和-3(0是初始状态,-3是PROPAGATE(共享锁里面才涉及到的状
//态)),事实上在ReentrantLock里面它只能是0,则将前驱节点设为
//SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//挂起当前线程
LockSupport.park(this);
//唤醒后返回中断状态
return Thread.interrupted();
}
acquireQueued中的for(;;){}这个循环按照我的理解都至少执行两个到三次。
对于一个新的节点,它的前驱节点的waitStatus<=0(处于非CANCELLED状态),他的waitStatus初始化应该是0,那么会第一次执行shouldParkAfterFailedAcquire的时候必定是返回false的,同时设置它的前驱节点的waitStatus为SIGNAL,第二次循环时才会直接返回true,从而得以执行parkAndCheckInterrupt()方法。
如果它的前驱节点waitStatus>0,那么第一次循环执行shouldParkAfterFailedAcquire首先还要往前寻找一个处于非CANCELLED状态的前驱节点,将其的后继节点设为当前节点,第二次执行的时候设置它的前驱节点的waitStatus为SIGNAL,第三次循环时才会直接返回true,从而得以执行parkAndCheckInterrupt()方法。
下面简要讲解一下释放锁的操作的源代码。
public final boolean release(int arg) {
//首先执行tryRelease()
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//释放成功,将exclusiveOwnerThread设置为null,说明已经没有线程占用锁了
//因为是ReentrantLock是可重入的,所以必须state=0了才能说明已经完全释放锁了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//从后往前遍历,找到最前的waitStatus<=0的节点,然后把它唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
到这里,非公平锁的加锁和释放锁的过程已经按照我个人的理解讲了一遍。下面再提一下公平锁和非公平锁的一些区别,它们的之间的主要是在tryAcquire中请求公平锁时会先调用hasQueuedPredecessors查看当前等待队列中是否有线程在等待锁,如果有当前线程则不会获得锁。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//查看当前等待队列中是否有线程在等待,如果有返回true,如果没有返回false
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
嗯,写完了,第一篇技术文章,写得不好,下次改进。