AbstractQueuedSynchronizer(AQS)
AbstractQueuedSynchronizer 继承自AbstractOwnableSynchronizer,AbstractOwnableSynchronizer设置了一个属性exclusiveOwnerThread, 表示当前的owner线程.
AQS是一个抽象类, 里面主要包含几个属性.
/**
* 队列头
*/
private transient volatile Node head;
/**
* 队列尾
*/
private transient volatile Node tail;
/**
* 同步状态. 0无线程占用 >0 有线程占用
*/
private volatile int state;
Node是AbstractQueuedSynchronizer的一个内部类, 本质上是实现的是一个双向列表.Node主要包含几个属性
static final class Node {
//...
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 等待状态已被撤销 */
static final int CANCELLED = 1;
/** 等待状态被唤醒 */
static final int SIGNAL = -1;
/** 等待状态等待唤醒信号 */
static final int CONDITION = -2;
/**
*
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 上一个node */
volatile Node prev;
/** 下一个node */
volatile Node next;
// 用于实现条件队列的单向链表
Node nextWaiter;
//...
}
在AbstractQueuedSynchronizer中定义的获取锁的方法, 改方法会在子类中被调用, 但是具体实现由子类实现.这个方法很重要, 子类常常调用该方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock
ReentrantLock是基于AbstractQueuedSynchronizer(AQS)实现的. ReentrantLock是一种可重入锁.
ReentrantLock有一个内部类Sync, 继承自AbstractQueuedSynchronizer. ReentrantLock有两种实现FairSync(公平锁)和NonfairSync(非公平锁, 默认是非公平锁). 两种方式都继承自Sync. 在Sync中定义了获取锁的抽象方法abstract void lock(), 两个实现类都实现了改抽象方法. 下面我们直接从源码层面解析两个锁的实现方式.
NonfairSync 非公平锁
加锁
非公平锁是ReentrantLock的默认实现.非公平锁加锁流程:
- 尝试修改state=1, 如果成功将当前owner线程修改为当前线程. 获取锁成功
- 如果失败,将当前线程加入等待队列中, 加入过程中需要跳过等待状态为撤销的节点, 以保证前置节点的状态是可被唤醒的
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**
* 父类(AQS)中的acquire方法
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 非公平锁的tryAcquire的实现
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* tryAcquire失败后加入等待队列
*/
private Node addWaiter(Node mode) {
// 构建新的node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果队尾不为空
if (pred != null) {
// node的prev指针指向队尾
node.prev = pred;
// 设置队尾的尾部为新node, 避免多线程同时加入队列的情况冲突
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队尾为空或是加入队列冲突失败, 自旋加入队列
enq(node);
return node;
}
/**
* 自旋加入队列
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* @param node 新加入队列的node
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p = node的前置节点
final Node p = node.predecessor();
// 如果 p == head 表示node为队列第一个排队线程, 并再次发起一次tryAcquire, 如果设置state成功, 则退出
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
*如果p!=head或tryAcquire失败, 则找到node所有前置节点中最近的一个waitStatus==SIGNAL的节点A, *并将node的前置节点调整为A, 这样node唤起的前置节点的状态就是等待唤起状态而不是取消状态, 方便后面唤起
* parkAndCheckInterrupt 将当前线程挂起, 即node.waitStatus=SIGNAL
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果以上操作都失败了, 则将node.waitStatus=CANCELLED, 状态改为撤销
if (failed)
cancelAcquire(node);
}
}
我们在实际的开发过程中还经常用到tryLock()
.这个方法底层调用的就是非公平锁的nonfairTryAcquire()方法.
此外还有tryLock(timeout, timeunit)
方法, 其调用的是AQS中的tryAcquireNanos方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁, 成功则返回
// 获取锁失败,
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 加入等待队列中. 加入队列后不停轮询直到该线程成为队首, 并尝试获取锁. 如果获取锁失败或是到了deadline, 则将线程挂起
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
解锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 释放锁
protected final boolean tryRelease(int releases) {
// state递减
int c = getState() - releases;
// 当前owner和当前线程校验
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果state==0, 表示没有线程占用, 则owner设置为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
FairSync 公平锁
公平锁的加锁流程:
- 判断state是否等于0
- state==0, 判断当前队列是否有线程在排队, 如果没有则尝试修改compareAndSetState(0,1),并将当前owner线程修改为当前线程. 获取锁成功
- state==1, 判断owner线程是不是就是自己, 如果是自己递增state.获取锁成功
- 如果获取锁失败, 将当前线程加入等待队列中, 加入过程中需要跳过等待状态为撤销的节点, 以保证前置节点的状态是可被唤醒的
final void lock() {
acquire(1);
}
// 重写tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果当前state==0, 表明没有线程占用
/**
* 公平锁遵循先到先得的原则, 即便当前state==0也需要查看在等待队列中是否有等待的线程
* 如果等待队列中没有线程, 尝试通过CAS方式设置state=1,
* 如果设置成功则将当前AQS中的线程owner设置为当前线程
**/
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程被占用, 且owner为当前线程, 递增state
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取锁失败
return false;
}
小结
ReetrantLock是基于AQS的一种锁实现, 利用CompareAndSetState来对stete进行CAS操作实现获取锁的控制. 并利用AbstractOwnableSynchronizer.exclusiveOwnerThread每次获取锁以后设置exclusiveOwnerThread=currentThread, 释放锁时将exclusiveOwnerThread=null, 比较当前线程和owner是否一致来实现可重入锁.
Condition
Condition常用于生产者-消费者的场景中.Condition依赖Lock产生.
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生产
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}
// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}
Condition也是基于AQS的实现, 一个lock对象可以创建多个Condition对象, 那么Condition的await是如何实现挂起和signal又是如何实现唤醒的呢?我们先来简单介绍一下Condition的处理流程.
- Condition依附于一个条件队列, 和Lock的阻塞队列一样, 都是基于AQS的Node队列. 不同的是阻塞队列是一个双向队列而条件队列是一个单向队列.
- ConditionObject是AQS的一个内部类, ConditionObject有两个属性firstWaiter和lastWaiter
- 当调用condition.await()时, 将当前线程包装成Node, 加入到条件队列中, 之后释放当前线程持有的独占锁并将当前线程挂起
- 当调用condition.sigal()时, 将条件队列的firstWaiter(线程)�移到阻塞队列的队尾, 并唤起该线程, 释放锁. 线程被唤起后重新获取锁, 获取锁后await方法才能返回, 继续执行
下面我们来看看源码
public class ConditionObject implements Condition, java.io.Serializable {
/** 条件队列队首 */
private transient Node firstWaiter;
/** 条件队列队尾 */
private transient Node lastWaiter;
}
// 执行await, 加入条件队列, 释放锁, 挂起线程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入到条件队列中
Node node = addConditionWaiter();
// 释放当前线程持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前线程是否在阻塞队列中, 如果不在, 则将当前线程挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 该线程被其他线程的signal操作唤醒后, 检查线程在挂起期间是否发生了中断, 如果有中断是signal调用之前中断的还是调用之后中断的.如果没中断则返回0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤起后, 进入阻塞队列获取锁, acquireQueued在前面的篇幅中有介绍该方法是如何获取锁的
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 唤起线程, 由其他线程执行唤起操作
public final void signal() {
// 调用 signal 方法的线程必须持有当前的独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队列的第一个线程
Node first = firstWaiter;
if (first != null)
// 执行唤醒操作
doSignal(first);
}
private void doSignal(Node first) {
do {
// 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
// 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
// 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
}
// 从条件队列转移到阻塞队列
final boolean transferForSignal(Node node) {
// 执行cas操作, 如果状态不是待唤醒状态, 说明线程状态失效了. 否则将状态置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 自旋进入阻塞队列的队尾
// 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程
LockSupport.unpark(node.thread);
return true;
}
CountDownLatch
CountDownLatch使用场景很多, 比方说我们需要并行的进行一组(N)操作, 如向不同的服务器拉取数据, 我们期望并发的完成这样的操作并在所有的操作完成以后继续其他任务. 这个时候CountDownLatch就可以被使用起来了.
class Test {
void main() {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = Executors.newFixedThreadPool(8);
// 创建N个任务, 并行执行
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
// 等待所有的任务完成,这个方法才会返回
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
// 这个线程的任务完成了,调用 countDown 方法
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() { ...}
}
CountDownLatch也是基于AQS的实现.不同的是CountDownLatch是基于一种共享模式的实现.