Semaphore介绍
Semaphore,信号量,用来控制访问特定数量的共享资源,当线程线程想访问共享资源时,必须选获取到一个信号量,可以认为是一个访问许可证,才能访问,如果没有获取到信号量,就需要等待,等待前面有人释放信号量,才能获取到信号量。
Semaphore内部维护了一个计数器,代表能够有多少线程可同时访问共享资源,每次获取信号量时,计数器会减1,释放信号量时计数器会加1,当计数器减到0的时候,想要获取信号量的线程会被阻塞,等待前面已经获取到信号量的线程释放,释放之后,计数器增加1,阻塞线程被唤醒,并尝试再次获取信号量。
Semaphore可以在初始化的时候设置信号量的个数,以及是否许可公平策略。
这是Semaphore的一个简单Demo
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(3);
private static ExecutorService executor = Executors.newFixedThreadPool(7);
private static void useSemaphore() {
try {
semaphore.acquire();
System.out.println(new Date() + ":获取到信号量");
Thread.sleep(new Random().nextInt(10000));
semaphore.release();
System.out.println(new Date() + ":释放信号量");
} catch (Exception e) {
}
}
public static class Task implements Runnable {
@Override
public void run() {
useSemaphore();
}
}
public static void main(String[] args) {
for (int i = 0; i < 7; i++) {
executor.execute(new Task());
}
}
}
Sat Oct 20 13:16:34 CST 2018:获取到信号量
Sat Oct 20 13:16:34 CST 2018:获取到信号量
Sat Oct 20 13:16:34 CST 2018:获取到信号量
Sat Oct 20 13:16:38 CST 2018:释放信号量
Sat Oct 20 13:16:38 CST 2018:获取到信号量
Sat Oct 20 13:16:40 CST 2018:释放信号量
Sat Oct 20 13:16:40 CST 2018:获取到信号量
Sat Oct 20 13:16:41 CST 2018:释放信号量
Sat Oct 20 13:16:41 CST 2018:获取到信号量
Sat Oct 20 13:16:44 CST 2018:释放信号量
Sat Oct 20 13:16:44 CST 2018:获取到信号量
Sat Oct 20 13:16:46 CST 2018:释放信号量
Sat Oct 20 13:16:47 CST 2018:释放信号量
Sat Oct 20 13:16:47 CST 2018:释放信号量
从输出可以看出来,最多只有3个线程能够获取到信号量,其他线程都要阻塞,只有当前面的线程适当信号量之后,阻塞的线程才能够获取信号量,继续执行代码。
Semaphore源码分析
非公平信号量的获取
基于jdk1.8分析Semaphore实现,首先看获取信号量的实现,函数调用顺序如下:
/ /默认为非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
//最后其实就是设置了state为信号量的个数
protected final void setState(int newState) {
state = newState;
}
semaphore.acquire(),获取1个信号,调用过程如下。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//现有有的信号量
int available = getState();
//这次获取之后还剩下的信号量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
remaining < 0表示当前没有信号量可获取,会进入doAcquireSharedInterruptibly,等待信号量,并被挂起,直到被唤醒。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//把当前线程封装成一个节点,加入到等待信号量链表的末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前一个节点
//如果上一节点是CLH队列的表头,则"尝试获取共享锁"
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取成功,需要将当前节点设置为AQS队列中的第一个节点
//这是AQS的规则,队列的头节点表示正在获取锁的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire检查是否需要将当前线程挂起
//parkAndCheckInterrupt挂起当前线程,并返回线程是否被中断
//如果当前线程被中断,抛出中断异常,退出循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从循环中可以看出,当线程被唤醒,并不一定能获得信号量,而是继续通过tryAcquireShared方法去竞争获取,如果这时候正好有新的线程去获取信号量,有可能这个没有任何等待,刚刚来的新线程获取到信号量,这就是不公平策略的体现
//在等待链表中,加入当前线程
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果当前链表有尾节点,即当前链表不为空时
//把当前节点放入到尾节点后面,更新尾节点为当前节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//当前等待链表还没有节点时,进行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//把当前节点放入到尾节点后面,更新尾节点为当前节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
waitStatus是节点Node定义的,她是标识线程的等待状态,他主要有如下四个值:
1、CANCELLED = 1,在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会变化;
2、SIGNAL = -1:当前线程的后继线程需要被unpark(唤醒),后续节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后续节点,使后续节点的线程得以运行;
3、 CONDITION = -2 :线程(处在Condition休眠状态)在等待Condition唤醒,节点在条件队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从条件队列中转移到同步队列中,加入到对同步状态的获取中;
4、 PROPAGATE = –3,其它线程获取到“共享锁”,表示下一次共享式同步状态获取将会无条件地传播下去
有了这四个状态,我们再来分析上面代码,当ws == SIGNAL时表明当前节点需要unpark(唤醒),直接返回true,当ws > 0 (CANCELLED),表明当前节点已经被取消了,则通过回溯的方法(do{}while())向前找到一个非CANCELLED的节点并返回false。其他情况则设置该节点为SIGNAL状态。我们再回到if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()),p是当前节点的前继节点,当该前继节点状态为SIGNAL时返回true,表示当前线程需要阻塞,则调用parkAndCheckInterrupt()阻塞当前线程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//调用LockSupport阻塞线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
//如果还有剩余量,继续唤醒下一个线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
首先,使用了CAS更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个"releaseShared"操作。
为什么要这么做呢?这就是共享功能和独占功能最不一样的地方,对于独占功能来说,有且只有一个线程,能够获取锁,但是对于共享功能来说,共享的状态是可以被共享的,也就是意味着其他AQS队列中的其他节点也应能第一时间知道状态的变化,实现节点自身获取共享锁成功后,唤醒下一个共享类型节点的操作,实现共享状态的向后传递。doReleaseShared方法,其实唤醒队列中等待的线程,下面会讲解的
非公平信号量的释放
下面来看一下释放信号量,其实就是释放信号量,以及唤醒等待线程两个主要流程
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//释放信号量
if (tryReleaseShared(arg)) {
//唤醒等待线程
doReleaseShared();
return true;
}
return false;
}
// 核心代码
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//当前信号量
int current = getState();
//释放后的信号量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
semaphore.release(),可以看到释放信号量,通过compareAndSetState方法,将信号量的数量增加
修改信号量之后,会调用doReleaseShared方法,唤醒队列中等待的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态
//意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒头结点的下一个结点,头结点为new Node()或者当前获取信号量的线程
unparkSuccessor(h);
}
//如果本身头节点的waitStatus是处于重置状态(waitStatus==0)的,将其设置为“传播”状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared方法,获取头结点,如果头结点状态为Node.SIGNAL,意味着,它正在等待一个信号,或者说,它在等待被唤醒,因此做两件事,第一是重置waitStatus标志位,更新头结点状态为0,第二是重置成功后,唤醒头结点的下一个结点的线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
公平信号量的获取
非公平信号量
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平信号量
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
从代码就可以看出来,非公平和公平信号量,主要是tryAcquireShared方法不一样,公平信号量在每次线程尝试获取的时候,都会判断head.next节点(第一个阻塞等待的节点)是不是为当前线程的节点,如果不是就返回-1,插入到链表的尾节点,等待被唤醒,是的话才会去竞争获取信号量,这样就能保证获取信号量的顺序和加入到阻塞链表的顺序保持一致
semaphore.acquire()在获取不到信号量的时候会挂起当前线程,而tryAcquire,不会挂起线程,会直接返回是否获取到了信号量,
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
通过tryAcquire(long timeout, TimeUnit unit)可以设置获取信号量的等待时间,主要是通过doAcquireSharedNanos方法来实现的,我们来看一下具体是怎么实现的
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
//当能够挂起当前线程时,将当前线程挂起nanosTimeout时间
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//解除node与线程的关联关系
node.thread = null;
// 跳过node前面为cancelled状态的节点,找到一个有效的前继节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//node为head的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
大部分逻辑和doAcquireSharedInterruptibly类似,首先第一个不同在于进入方法时,计算出了一个“deadline”,每次循环的时候用当前时间和“deadline”比较,大于“dealine”说明超时时间已到,直接返回方法。
注意nanosTimeout > spinForTimeoutThreshold,从变量的字面意思可知,这是拿超时时间和超时自旋的最小作比较,在这里Doug Lea把超时自旋的阈值设置成了1000ns,即只有超时时间大于1000ns才会去挂起线程,否则,再次循环,以实现“自旋”操作
如果超时没有获得信号量会返回false,同时执行finally中的函数,将node状态设置为CANCELLED,删除当前节点
AQS介绍
信号量中的FairSync和NotFairSync都是继承自Sync,而Sync又是继承自AQS,使用到了很多AQS的东西,AQS(AbstractQueuedSynchronizer):为java中管理锁的抽象类。该类为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。该类提供了一个非常重要的机制,在JDK API中是这样描述的:为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。 这么长的话用一句话概括就是:维护锁的当前状态和线程等待列表。
CLH:AQS中“等待锁”的线程队列。我们知道在多线程环境中我们为了保护资源的安全性常使用锁将其保护起来,同一时刻只能有一个线程能够访问,其余线程则需要等待,CLH就是管理这些等待锁的队列。
CAS(compare and swap):比较并交换函数,它是原子操作函数,也就是说所有通过CAS操作的数据都是以原子方式进行的。