Java队列有两种,一种是阻塞队列,一种是非阻塞队列
还有一个队列和 Queue 关系很紧密,那就是Deque,这其实是 double-ended-queue 的缩写,意思是双端队列。它的特点是从头部和尾部都能添加和删除元素,而我们常见的普通队列Queue 则是只能一端进一端出,即FIFO
一 什么是BlockingQueue
1. 定义
阻塞队列BlockingQueue是支持以下两个操作的队列:
- 当队列满的时候,队列会阻塞插入元素的线程,直到队列不满
- 当队列空的时候,获取元素的线程会等待队列变为非空
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列主要有下面六个实现类,分别是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue、LinkedTransferQueue 。这些阻塞队列有着各自的特点和适用场景。队列有数组和链表两种实现方式
2. BlockingQueue插入和删除方法
add 和 remove:
boolean add:插入成功,返回true,插入失败,抛出异常(illegalStateException)
E remove:移除成功,返回被移除的元素,移除失败,抛出异常(NoSuchElementException)
offer 和 poll:
boolean offer:插入成功,返回true,插入失败,返回false
E poll:移除成功,返回被移除的元素,移除失败,返回null
如果是offer(e,time,TimeUnit)和poll(time,TimeUnit):
boolean offer(e,time,TimeUnit):插入成功,返回true,规定时间内插入不成功,返回false
E poll(time,TimeUnit):移除成功,返回被移除的元素,规定时间内移除不成功,直接返回null
put 和 take:
void put:待插入元素线程一直阻塞,直到队列有空间
E take:移除成功,返回被移除的元素,移除失败,移除元素线程一直阻塞
测试插入删除代码
插入
public class Main1 {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(5);
for (int i = 0; i < 6; i++) {
System.out.println(i);
arrayBlockingQueue.add(i);
}
}
删除
public class Main2 {
public static void main(String[] args) {
List<Integer> integerList = Arrays.asList(1, 2, 3, 4, 5);
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(5,false,integerList);
for (int i = 1; i <=6; i++) {
remove(arrayBlockingQueue,i);
}
}
public static void remove(ArrayBlockingQueue<Integer> arrayBlockingQueue,int i){
Integer remove = arrayBlockingQueue.remove();
System.out.println(i);
}
3. 一些结论
- 综上可知,我们应该主要关注的阻塞队列中的方法应该是阻塞的方法put()和take()
- 一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)
- 阻塞队列主要是为了实现生产者-消费者模型的,它实现了 java.util.Collection 接口,但是如果我们把它当做普通的集合来用不恰当,因为这类操作通常并不高效【原因:阻塞队列是线程安全容器,因此内部操作里面采用了reentrantLock,因此相比直接用普通集合,效率肯定是慢的】
- BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现
- BlockingQueue是一个比较简单的线程安全容器`
4. 关于BlockingQueue线程安全的测试
public class TestThread {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int i = 10000000; i >= 1; i--) {
list.add(0);
}
System.out.println("源集合数量:" + list.size());
//线程不安全的list
//List<Integer> newList = new ArrayList<>();
// Vector<Integer> newList = new Vector<>();
//我们要测试的阻塞队列
ArrayBlockingQueue<Integer> newList = new ArrayBlockingQueue<>(10000000);
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(100);
for (Integer integer : list) {
executor.submit(() -> {
newList.add(integer + 1);
});
}
executor.shutdown();
try {
//等待线程池中所有子线程任务完成
executor.awaitTermination(6, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - start) + "ms");
System.out.println("新集合数量:" + newList.size());
}
}
注意:上面我们使用了线程池来批量往容器中添加元素,线程不安全的arrayList将结果不等于源头数量100w,而vector和我们的阻塞队列则是正常的输出结果100w
上面除了使用executor.awaitTermination来等待线程池任务执行完毕还可以使用CountDownLatch来等待所有任务结束
二 几种常见阻塞队列
1. DelayQueue
DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口,而Delayed接口继承了Comparable接口。DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长时间的等待。消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。
什么是Leader/Followers模式
- 有若干个线程(一般组成线程池)用来处理大量的事件
- 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠
- 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件
- 唤醒的追随者作为新的领导者等待事件的发生
- 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者
- 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理
因此线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换
运行状态
- 初始状态
假设有3个消费者线程,队列中的元素按照过期时间排序,队列头部元素对象4在4s以后到期,消费线程1查看对象4发现还未到期,于是进入等待状态,4秒以后将会醒来,等待头部元素到期的线程称为Leader线程,这里线程1位Leader线程
- 2s以后
对象4到期,消费者线程1拿到对象4,从等待状态进入处理状态,同时向消费者2和消费者3发送signal。消费者2和消费者3会争夺leader权,这里消费者2进入等待状态等待对象3到期,而消费者线程3就行进入待命状态
此时队列中加入新元素对象6,10秒后到期,位于队列末尾
- 又2s过去
先看消费线程1,如果它已经结束了对象4的处理,则进入待命状态。如果还没有结束,则它继续处理对象4
消费线程2取到对象3以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待状态,成为新的Leader。现在头部元素是新插入的对象7,因为它1s以后就过期,要早于其它所有元素,所以排到了队列头部
- 又1s过去
一种不好的结果:
消费线程3一定正在处理对象7。
消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状态,也更加进入不了等待状态。此时对象2马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一定会延期
可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长
另外一种好的结果:
消费线程1与消费线程2很快的完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态(Leader),对象2一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会发生明显的延期
所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务延期、队列元素堆积等情况
案例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class DelayedTask implements Delayed{
/*任务之间的启动间隔在1~2s之间*/
public static long currentTime=System.currentTimeMillis();
/*任务调度时间**/
public final long scheduleTime;
/*任务个数*/
public static final AtomicInteger nTask = new AtomicInteger(0);
/*任务名称*/
public final String taskName;
/*任务时间花费*/
public final int timeCost;
public DelayedTask(String taskName, int timeCost) {
this.taskName = taskName;
this.timeCost = timeCost;
nTask.incrementAndGet();
// 每个任务初始化的时候都会重新设置这个时间,下一个任务是在上一个任务初始化的时间的基础上加上1-2秒
currentTime+=1000+(long)(Math.random()*1000);
scheduleTime=currentTime;
}
/**
* 队列新放入元素的排序比较
*/
@Override
public int compareTo(Delayed o) {
return (int)(this.scheduleTime-((DelayedTask)o).scheduleTime);
}
/**
* 获取执行时间和当前时间的差距
*/
@Override
public long getDelay(TimeUnit unit) {
long delay=scheduleTime-System.currentTimeMillis();
return unit.convert(delay,TimeUnit.MILLISECONDS);
}
public void execTask() {
long startTime = System.currentTimeMillis();
SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss:SSS");
String scheduleTimeStr = sdf.format(new Date(Long.parseLong(String.valueOf(scheduleTime)))); // 时间戳
String realStartTimeStr = sdf.format(new Date(Long.parseLong(String.valueOf(startTime)))); // 时间戳
System.out.println("Thread "+Thread.currentThread().getName()+" Task " + taskName + ": schedule_start_time=" + scheduleTimeStr + ",real start time="
+ realStartTimeStr + ",delay=" + (startTime - scheduleTime));
try {
Thread.sleep(timeCost);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class DelayedTaskConsumer implements Runnable{
private final BlockingQueue<DelayedTask> delayQueue;
public DelayedTaskConsumer(BlockingQueue<DelayedTask> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
DelayedTask task;
try {
while (true) {
task = delayQueue.take();
task.execTask();
DelayedTask.nTask.decrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + " finished");
}
}
}
public class DelayQueueDemo{
public static void main(String[] args) {
BlockingQueue<DelayedTask> delayQueue = new DelayQueue<>();
for(int i=0;i<10;i++){
try {
delayQueue.put(new DelayedTask("work "+i,2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int nTaskConcumer=1;
ThreadGroup tGroup=new ThreadGroup("Consumers");
// 创建消费线程
for(int j=0;j<nTaskConcumer;j++){
Thread t = new Thread(tGroup,new DelayedTaskConsumer(delayQueue));
t.start();
}
while (DelayedTask.nTask.get()>0){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 中断所有消费者子线程的执行
tGroup.interrupt();
System.out.println("Main thread finished");
}
首先启动一个消费者线程。因为消费者线程处理单个任务的时间为2s,而任务的调度间隔为1~2s。这种情况下,每当消费者线程处理完一个任务,回头再从队列中新取任务时,新任务肯定延期了,无法按给定的时间调度任务。而且越往后情况越严重。运行代码看一下输出:
最后一个任务的延迟时间已经超过4.5s了
再作一次测试,将消费者线程的个数调整为2,这时任务应该能按时启动,延迟很小,基本都在1ms左右
最优的消费者线程的个数与任务启动的时间间隔好像存在这样的关系:单个任务处理时间的最大值 / 相邻任务的启动时间最小间隔 = 最优线程数,如果最优线程数是小数,则取整数后加1,比如1.3的话,那么最优线程数应该是2
本例中,单个任务处理时间的最大值固定为2s
相邻任务的启动时间最小间隔为1s
则消费者线程数为2/1=2
如果消费者线程数小于此值,则来不及处理到期的任务。如果大于此值,线程太多,在调度、同步上花更多的时间,无益改善性能 from
2. ArrayBlockingQueue
ArrayBlockingQueue采用ReentrantLock实现线程安全访问依赖了condition和ReentrantLock,以下先用并发大佬的DougLea例子讲一下condition是个啥
public class CreateConsumerModel {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putIndex;
int takeindex;
int nElement;
public void put(Object o) {
lock.lock();
try {
while (nElement==items.length) { // 是while不是if,保证持续生产
try {
notFull.await(); // 等待不满,倒装句
} catch (InterruptedException e) {
e.printStackTrace();
}
}
items[putIndex]=o;
if(++putIndex==items.length) // 如果已经到了末尾,则重置插入位置,否则将putIndex放置在下一个位置
putIndex=0;
++nElement;
notEmpty.signal(); // 生产成功,通知消费者消费,通知不空
} finally {
lock.unlock();
}
}
public Object take(){
lock.lock();
try{
while(nElement==0){
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object o = items[takeindex];
if(++takeindex==items.length)
takeindex=0;
--nElement;
notFull.signal();
return o;
} finally {
lock.unlock();
}
}
}
可以看到:condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。
每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:
//摘取自ArrayBlockingQueue
notEmpty = lock.newCondition();
notFull = lock.newCondition();
//摘取自reentrantLock,下面实例化的是AQS里面的ConditionObject
final ConditionObject newCondition() {
return new ConditionObject();
}
//AQS里面有一个CLH同步阻塞队列(Sync Queue),还有一个条件队列(Condition Queue),两个不是一个东西,讲AQS的时候一直围绕着Sync Queue来讲的
public class ConditionObject implements Condition, java.io.Serializable {
/** 条件队列的第一个节点. */
private transient Node firstWaiter;
/** 条件队列最后一个节点,Node类型和Sync Queue中Node都是同一个实现,下面的Node. */
private transient Node lastWaiter;
...
}
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
//表示条件队列当前节点的下一个节点
Node nextWaiter;
//prev 和 next 用于实现CLH同步队列的双向链表,这里的 nextWaiter 用于实现条件队列的单向链表
}
总结:
- 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的
- ConditionObject 只有两个属性 firstWaiter 和 lastWaiter,一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例
- 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表
- 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到CLH同步队列(Sync Queue)的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程构建为一个Node,加入condition对象的条件队列中
Node node = addConditionWaiter();
//condition依赖于锁,所以能释放必然已经获取到锁,没获取锁在这里调用该方法将抛出异常,并将已经加入到条件队列中的这个节点的状态置为取消
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断当前节点是否已经加入到CLH同步队列
while (!isOnSyncQueue(node)) {
//没加进去就调用park挂起线程阻塞
LockSupport.park(this);
//只有当前线程被唤醒以后,才开始执行下面的,否则一直在上面一行阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
AbstractQueuedSynchronizer.ConditionObject# addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// node在初始化的时候,指定waitStatus为Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
AbstractQueuedSynchronizer.ConditionObject# unlinkCancelledWaiters
停止关联已经取消等待的节点,这里模拟等待队列中有2个节点,且第二个节点即尾结点是取消状态
private void unlinkCancelledWaiters() {
// 1. 获取队头节点
Node t = firstWaiter;
Node trail = null; // 遍历指针
while (t != null) {
// 2. 对头节点存在的话就取出条件队列对头节点后面的一个节点
Node next = t.nextWaiter;
// 3. 如果队头节点取消了等待
if (t.waitStatus != Node.CONDITION) {
// 4. 不在让对头节点的下一个指针指向以前的next
t.nextWaiter = null;
if (trail == null)
// 5. 这里剔除了队头节点
firstWaiter = next;
else
trail.nextWaiter = next;
// 当
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
remove掉取消排队的节点以后,addConditionWaiter方法按照上面的逻辑完成,回到await红框下一行开始执行,
AbstractQueuedSynchronizer# fullyRelease
锁可重入,如果当前线程持有多把锁,state>=1,应当释放全部permit许可。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。
isOnSyncQueue
释放锁以后,开始执行自旋判断当前节点进入CLH同步队列
final boolean isOnSyncQueue(Node node) {
//默认CLH队列中节点的等待状态是0,在条件队列中节点的等待状态是-2(Node.CONDITION)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//当前节点后继有节点的时候,说明必定已经进入到CLH同步队列
if (node.next != null) // If has successor, it must be on queue
return true;
//执行下面有3种可能
// 1.节点的等待状态不为-2
// 2.节点的前驱不为空
// 3.当前节点的后继为空
return findNodeFromTail(node);
}
上面代码就有一个问题,为何节点的前驱指针指向空可以直接判断节点不在同步队列,节点的后继不为空可以直接判断节点在同步队列,但是不能用节点的前驱prev不为空来判断节点是否处于同步队列中?
答案:因为入CLH同步队列的时候,采用尾部插入,先需要修改当前节点的前驱为以前的tail,然后通过CAS-tail操作将当前节点设置为新的尾结点,但是这个CAS-tail是有几率失败的,因此无法用当前节点的前驱节点prev不为空来判断节点是否处于同步队列中
findNodeFromTail
private boolean findNodeFromTail(Node node) {
//从同步队列的队尾往前遍历,查看是否能找到这个节点
Node t = tail;
for (;;) {
if (t == node)
return true;
//如果CLH队列尾结点不存在,必定没入队
if (t == null)
return false;
//自旋判断全部CLH同步队列节点
t = t.prev;
}
}
返回false,回到await方法,使用LockSupport.par()挂起当前线程,等待其他线程调用signal()唤醒刚刚挂起的线程
signal()
//唤醒等待最久的线程,即等待队列的头节点
public final void signal() {
//要调用本方法,必须持有独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal(Node first)
//执行唤醒
private void doSignal(Node first) {
do {
//因为要将等待最久的等待队列头节点移动到CLH同步队列,所以将新的头节点指针指向要移动节点的下一个,同时判断新的头节点指针指向的节点是否存在
if ( (firstWaiter = first.nextWaiter) == null)
//不存在的话,同时清空等待队列的尾结点指针
lastWaiter = null;
//清除将要移动节点的nextWaiter指针
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal
final boolean transferForSignal(Node node) {
//CAS将等待队列中的节点的等待状态从-2更新到0,没更新成功是因为内存指定偏移量上的当前值不是-2,可能是取消了,因此CAS失败,直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//执行节点入队列(直接将节点加到CLH同步队列末尾),入队成功返回该节点的前驱节点
Node p = enq(node);
//获取该节点的前驱节点等待状态,默认是0
int ws = p.waitStatus;
//如果前驱节点的状态为取消或者CAS置换前驱节点状态为-1失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//(此步骤待商榷***********)
LockSupport.unpark(node.thread);
//成功直接返回
return true;
}
双或的条件意思是左边不满足,才走右边,如果左边满足,右边直接不执行
正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true,所以一般也不会进去 if 语句块中唤醒 node 对应的线程,然后这个方法返回 true,也就意味着 signal 方法结束了,节点进入了同步队列
回到await方法,在线程park以后经过其他线程signal以后,线程从条件队列进入CLH同步队列,开始尝试获取锁,如果重新获取到锁,则继续从await方法中park的下一行执行
int interruptMode=0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
interruptMode为中断状态,取值默认为0,意为await期间,没有发生中断,其他情况如下:
- REINTERRUPT(1):代表await返回的时候,需要重新设置中断标志
- THROW_IE:代表await返回的时候,需要排除interruptedException异常
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
从LockSupport.park()这句话返回继续往下执行的几种方式:
- 常规路径:signal->转移节点到CLH同步队列-->(AQS里面的原理:被其他占有锁的线程释放锁的同时unpark唤醒这个线程)
- 线程中断:在park的时候,另外一个线程对这个线程进行了中断
- transferForSignal()方法中,转移到CLH同步队列的线程节点的前驱节点等待状态>0(取消等待了)或者通过CAS-waitStatus将前驱节点等待状态更改为signal(-1)失败
checkInterruptWhileWaiting(Node node)
检查是signal之前的中断(返回THROW_IE=-1),还是signal之后的中断)(返回REINTERRUPT=1)
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}
//只有线程处于中断状态的时候才会执行此方法
final boolean transferAfterCancelledWait(Node node) {
//如果这里CAS-waitStatus成功,说明当前内存地址中的节点的waitStatus=-2,说明是在signal之前中断的
//因为signal之后,内存中的节点等待状态已经是0,是不可能CAS操作成功的
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果在signal之前中断,就将节点放入CLH同步队列
//(可以看出来,即使中断了,依然会转移到同步队列)
enq(node);
return true;
}
//到这一步是因为transferForSignal()已经将节点等待状态从-2更新到0了
//signal方法会将节点移动到同步队列,但是可能没完成,这边自旋等待其完成
while (!isOnSyncQueue(node))
Thread.yield();
//返回false,在signal后面中断
return false;
}
从上面的代码可以看到:即使当前线程发生了中断,在park,但是节点依然会移动到同步队列
即排在条件队列中的节点,被挂起了,然后醒了,他发现是中断让自己醒来而不是signal,但是他会自己主动去进入CLH同步队列
上面方法返回true为在signal之前中断,抛出中断异常,interruptedMode=-1,上面方法返回false为在signal之后,在入CLH队列的过程中中断(几率较小),interruptedMode=1
下面继续回到await方法
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//判断完是否过程中发生中断以后,节点都已经进入CLH同步队列,如果没有发生中断,下面条件不满足,再走一次for,然后退出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//上面默认没有发生中断
//左边返回false表示没有中断且获得锁成功
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//在signal之前中断,也需要将节点进行转移到阻塞队列,这部分转移的时候,是没有设置node.nextWaiter=null的
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
如果过程中有中断(interruptMode!=0),就需要调用下面方法
reportInterruptAfterWait(int interruptMode)
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE) //-1
throw new InterruptedException();
else if (interruptMode == REINTERRUPT) //1
selfInterrupt();
}
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
流程图
-1的时候await方法抛出异常,因为它代表await()期间发生中断
1的时候重新中断当前线程,因为它代表await()期间没有被中断,而是调用signal()以后发生的中断