AbstractQueuedSynchronizer源码
AbstractQueuedSynchronizer(AQS)内部结构
AQS是ReentrantLock、ReentrantReadWriteLock、CountDownLatch等经常使用锁的父类。AQS既然是父类,一些接口是需要实现类自己去实现的例如tryAcquire,tryRelase;AQS提供共享锁和排它锁(独占锁)。
AQS内部有个Node,申请锁的线程会封装为一个Node,申请一个锁的所有线程会组成Node链表。
链表是双链表结构
非租塞,在并发下插入和移除操作不会阻塞(自旋,CAS)
FIFO
AQS内还维护了state(内部同步状态)、线程的阻塞和解除操作。
所有对state操作是原子性的
AQS内部使用LockSuppert.park、unPark操作。内部会在每个Thread设置一个本地状态,让线程轮训本地状态,而不是像普通的锁,大家都是竞争同一个锁,造成更大的浪费。
Node的双链表结构图
Node的属性:
1.prev 指向节点上一个节点,如果没有则为null
2.next 指向节点下一个节点,为了查找最后一个节点方便,避免单链表情况下必须从head开始找,如果没有则为null
3.waitStatus 线程状态,CLH结构中使用前一节点一个属性标识当前节点状态,方便实现取消和超时功能。
CANCELLED =1,被取消或超时
SIGNAL =-1 ,只是当前节点的后续节点被阻塞了,需要unparking
CONDITION = -2,当前节点在conditon队列中
PROPAGATE = -3,共享锁使用,线性广播唤醒线程。
默认 0,初始状态
4.nextWaiter:标识condtion队列的后续节点,此时prev,next不用,而且节点waitStauts是CONDITION.
5.thread 对应的线程
源代码排它锁(ReentrantLock)
ReentrantLock使用方式 1):
ReentrantLock.lock
ReentrantLock.unlock
ReentrantLock lock过程
ReentrantLock内部有公平锁FairSync和非公平锁NonfairSync,先看公平锁
ReentrantLock:
public void lock() {
sync.lock();
}
//AbstractQueuedSynchronizer:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果没有获取到锁 就创建Node节点 封装当前线程,成功后并中断当前线程
selfInterrupt();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 需要unpark
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //大于1的都是取消的,要找到小与0的
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 需要前驱节点(waitStatus)unpark,也就说告诉前节点,你的next节点需要被通知运行
}
return false;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //取出前节点
if (p == head && tryAcquire(arg)) {//如果前节点是head,则设置改节点为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //并不中断
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //这里会有LockSupport操作,
//LockSupport.park unPark,这里会有一个许可。park在获取不到许可就会组阻塞,
interrupted = true;
}
} finally {
if (failed) //失败就会取消线程
cancelAcquire(node); //取消后还要调整队列
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //新建节点 并入队尾 参数是模式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { //如果有并发,这里会失败,进入enq
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { //CAS 操作,自旋模式 直到设置成功哦
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//FairSync:
final void lock() {
acquire(1); //基类AQS方法
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //这里是判断是否是空链表或者当前线程是链表头部
if (!hasQueuedPredecessors() && //判断是否有pre节点 不包括head
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);//设置为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //没有获取到返回false
}
ReentrantLock unlock过程
//ReentrantLock:
public void unlock() {
sync.release(1);
}
//AQS:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) //如果不是head不为空,切不为0,则要唤醒后续节点
unparkSuccessor(h);
return true;
}
return false;
}
//sync:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { //找到小于0的可执行的
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //释放
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //同步参数 -1
if (Thread.currentThread() != getExclusiveOwnerThread()) //不等于当前线程则会有错误
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果没有(并发),则设置为null,
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//应该和重入哦 重入锁解锁的时候不会唤醒下一个线程
return free;
}
非公平锁:
非公平锁很简单,在开始是直接去争夺修改status(0->1),没有使用链表,谁修改status成功谁拿到锁
final void lock() {
if (compareAndSetState(0, 1)) //
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { //
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock使用方式 2):
Condtion condtion = ReentrantLock.newConditon();
condtion.await 和Object类的wait方法等效
condtion.signal 和Object类的notify方法等
condtion.signalAll Object类的notifyAll方法等效
ReentrantLock condition实现类 ConditionObject
Node 链表只使用了nextWaiter,组成一个队列,调用await会加入node链表tail,signal是通知head节点运行;signalALl是遍历链表,都去竞争啊
//共享锁 wait