- Java-AQS同步器 源码解读<一>独占锁加锁
- Java-AQS同步器 源码解读<二>独占锁解锁
- Java-AQS同步器 源码解读<三>共享锁
- Java-AQS同步器 源码解读<四>-条件队列上
- Java-AQS同步器 源码解读<五>-条件队列下
共享锁
前面2篇文章描述了AQS中独占锁的加锁解锁,那今篇文章我们聊下AQS 中分享锁的加锁解锁
既然说道共享锁和独占锁,那2者最本质的区别是什么呢,大家应该记得AQS中有一个同步器状态State 字段,其实说说白了共享模式和独占模式,就是同步器的状态是否允许被多个线程所获取,比如我们之前说的ReentrantLock就是独占锁的模式,因为同步器状态只能被一个线程所获取,那这篇我将使用Semaphore来做分析共享锁。
共享锁加锁
Semaphore初始化
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
上面是信号量的默认构造函数 默认实现的是非公平锁
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
我们看到我们传递的信号量permits 最终还是调用了Sync的构造函数
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}
setState 其实就是调用AQS中的方法 就是给State 赋值
Semaphore获取 acquire() 方法
选取一个默认的获取方法如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看到 我们这边默认获取的信号量是1,当然acquire也有带参数的构造方法
很明显 我们看到acquire默认的构造函数调的是Sync中acquireSharedInterruptibly方法 之前我也说过Sync是继承了AQS的 我们IDE跟进这个方法 就进入了AQS类中
进入AQS中acquireSharedInterruptibly方法
/*这个方法就是去获取同步锁,除非线程发送了中断*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//这边就是坚持线程是否发生中断,如果中断则抛出异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
重写tryAcquireShared的实现
看了上面的代码,我们有看到一个熟悉的方法tryAcquireShared和之前独占锁tryAcquire很像,这个方法也是需要子类去重写的,熟悉的套路,熟悉的味道,哈哈!那我们就去找下tryAcquireShared这个方法
找下找 很快我找到了 在NonfairSync里面找到了 代码上面也有,NonfairSync非公平锁的tryAcquireShared方法原来是调用的父类的方法也就是Sync的,那我就去Sync类中看一看,果然找到了
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//又是一个自旋的操作,AQS 中有大量的这样的写法
int available = getState();
int remaining = available - acquires;
//整个自旋唯一的出口,就是当前的线程获占用完同步器的状态值后小于0或者CAS修改State值失败就返回了
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
看到这个返回我很懵逼呀,怎么就返回了呢,那我们结合刚才上面的acquireSharedInterruptibly来看 原来他判断如果返回值小于0 就会执行下面的方法doAcquireSharedInterruptibly方法,翻译成白话文 就是 就是 同步器里面5个苹果,可能前面几个线程都把苹果拿了 我再来拿的时候发现小于0了,拿怎么办呢 ,只能去排队等待咯,和独占锁流程差不多,区别就是这个后面,好的 我们慢慢再开看,
doAcquireSharedInterruptibly 获取失败后排队
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共享模式加入到阻塞队列中 这里addWaiter和独占锁加锁使用的是同一个方法 不清楚的 可以看之前的文章
final Node node = addWaiter(Node.SHARED);// 返回成功加入队尾的节点
boolean failed = true;//标识是否获取资源失败
try {
for (;;) {//自旋
final Node p = node.predecessor();// 获取当前节点的前置节点
if (p == head) {// 如果前置节点是head 那就去尝试获取资源,因为可能head已经释放了资源
int r = tryAcquireShared(arg);
if (r >= 0) {// 如果获取成功且大于等于0,意味这资源还有剩余,可唤醒其余线程获取
setHeadAndPropagate(node, r);// 这边方法就是和独占锁处理不一样地放 我们可以重点去看下 其余的流程是一样的
p.next = null; // help GC
failed = false;
return;
}
}
/*下面的方法和独占锁的是一样的 在第一篇文章中已经解读过,小伙伴们如果不清楚 可以去看下
有区别的地方就是对中断的处理这边是直接抛出中断异常,独占锁处理是返回标记是否中断 让上一层处理中断
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //赋值当前的head节点 因为下一步会对head 重写赋值
setHead(node);//设置当前node 节点 为head 这个和独占锁的是一样的
/*
* propagate > 0 的意思 就是同步器里面State是有剩余的 可以唤醒其他线程
* 后面的判断意思是 当前的head节点或者之前的head节点等于null 或者状态小于0 那也必须能唤醒后面线程去获取资源 head等于null 说明可能被GC回收了
* 这边的head的waitStatus 我自己模拟了下各自情况 只可能是-1或者-3
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//当前node 的下一个节点
if (s == null || s.isShared())//如果snull
doReleaseShared();
}
}
doReleaseShared方法
private void doReleaseShared() {
/*
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {// 这个head!=tail 说明阻塞队列中至少2个节点 不然也没必要去传播唤醒 如果就自己一个节点 就算资源条件满足 还换个谁呢?
int ws = h.waitStatus;// head 节点状态SIGNAL
if (ws == Node.SIGNAL) {// 如果head状态是
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//是和独占锁释放用的同样的方法 唤醒的是下一个节点 上一篇有分析到
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //这边设置为-3 是为了唤醒的传播 也就是满足上一个方法有判断waitStatus 小于0
}
if (h == head)
break;
}
}
这个条件是自旋唯一的出口就是head 没有发送变化 说明没有后面的线程获取资源 那就退出自旋,如果head发生了变化 说明传播的有效果了 后面线程获取到到了资源
还有要注意的地方 是doReleaseShared这个方法有2个地方调用 一个是就是这边共享锁加锁 还有一个就是共享锁解锁的地方
共享锁解锁
我们看下共享锁的解锁 其实看完了上面的内容 这个就简单了很多
Semaphore获取 releaseShared() 方法
代码在Semaphore类中
public void release() {
sync.releaseShared(1);
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//获取同步器的状态
int next = current + releases;//累加释放的资源值
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS 更新同步器状态值 成功就退出自旋
return true;
}
}
代码跳转到了AQS类中
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//tryReleaseShared 还是和之前的套路一样 子类去重写的 也很简单 代码也贴在了上面
doReleaseShared();//是不是很熟悉
return true;
}
return false;
}
看到doReleaseShared 我们应该很熟悉了 刚才加锁的时候 也用到了这个方法 具体就不多说了
总结共享锁和独占锁 区别之处
看了共享锁的加锁 我们在回顾下独占锁加锁 这边的处理,不然想到 这边的区别就是head获取资源后 独占锁直接设置自己为head 然后返回 而共享锁这边head 获取资源后 如果资源状态还有剩余 就会唤醒其余线程去获取,这就是2者的区别
同样的解锁的过程也是几乎一样 底层唤醒线程的unparkSuccessor方法都是公用的,解锁的过程也是多一个唤醒传播的过程
好的AQS的同步队列 的共享模式和独占模式 用了前面的3篇文章 和大家分享完了
后面 会分析下AQS中的条件队列 具体怎么运行的~
不要吝啬你的点赞 ,点赞 给我东西 ,继续写作