前记:
上篇文章写完之后,觉得代码贴的太多了,不过源代码
解析这种的,就是看源代码才有意思。主要是还是引导多思考,以
后写读源代码文章的博客的步骤,1.通过这段代码实现的功能
会先构思一下实现方案和代码架构,和一些其他的思考。2逐行
分析每个代码行的意思和意图。3.总结,对比其他类似的实现
或者其他的一些感悟。
正文:
ArrayBlockingQueue 是数组结构的堵塞队列的一种实现,那么肯定要实现的BlockingQueue接口。
解释一下接口含义
boolean add(E e); 队列添加元素,返回成功标识,队列满了抛出队列满的异常,无堵塞。
boolean offer(E e);队列添加元素,返回成功标识,无堵塞。
void put(E e);队列添加元素,无返回值,队列满了会堵塞。
boolean offer(E e, long timeout, TimeUnit unit);队列添加元素,队列满了堵塞,设置有超时时间。
E poll();队列拉取元素,无堵塞,没有值返回null。
E take();队列拉取元素,队列空了会堵塞,等待能拉取到值为止
E poll(long timeout, TimeUnit unit);队列拉取元素,队列空了等待,设置有等待超时时间
E peek() ; 只读队首元素的值,没有返回空
int remainingCapacity(); 计算剩余容量
boolean remove(Object o); 移除元素
int drainTo(Collection<? super E> c, int maxElements); 移除元素放到入参的集合当中
public Iterator<E> iterator() jdk 1.8以后ArrayBlockingQueue还增加了迭代器功能,这个模块下面会重点介绍,很有意思。
堵塞队列提供的功能:
在多线程环境下提供一个类似于生产者和消费者这样的一个模型
提供一个FIFO的顺序读取和插入
那就引起我的思考:
怎么实现的堵塞机制和堵塞的超时机制?
作为一个集合类,数组结构的怎么在多线程环境下实现安全扩容?
1.8jdk版本为什么会增加迭代器功能?
1.元素
/** 堵塞队列中存放的对象 /
final Object[] items;
/* 消费者获取对象的下一个对象下标,具体的操作有poll take peek remove /
int takeIndex;
/* 生产者放入对象的下一个对象的下标,具体的操作有 put offer add /
int putIndex;
/* 队列中元素的数量 /
int count;
/* 这个锁就是实现生产者,消费者模型的锁模型,并且所有和并发相关的堵塞控制都是通过这个锁来实现的/
final ReentrantLock lock;
/* 这个是有ReentrantLock 中的Condition一个标识队列中有元素非空标志,用于通知消费者队列中有数据了,快来取数据 /
private final Condition notEmpty;
/* 这个也是ReentrantLock 中的Condition的一个标识,标识队列中的元素不满用于通知生产者队列中空地,快来塞数据/
private final Condition notFull;
/*
- 这是一个迭代器集合,是之前没有的特性,
- 细节:transient 标示变量是序列化忽略这个变量。那么为啥要这么做呢?迭代器都是new 出来的,即使保存再集合里面,别人也拿不到这个的引用。用不了,纯浪费空间。
/
transient Itrs itrs = null;
/ 这个函数的意思是传入i返回i-1,因为是数组,空间已经固定了。 - 可以将线性结构理解成环形结构,最前面的那个数再减就到了最后面了 特别有意思的一点是1.6之前有inc这个函数和这个功能刚 好相反,不是特别明白为啥干掉了。也许是觉得不安全,破坏结构?
/
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
/* - 迭代器的集合,链表形式
*/
class Itrs {
/**
* 将里面的元素设置成弱引用,目标就是当成缓存使用的
* Node里面存放的其实迭代器
*/
private class Node extends WeakReference<Itr> {
Node next;
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
/** 记录循环的次数,当take下标到0的时候为一个循环 cycle+1 */
int cycles = 0;
/** Node的前节点 */
private Node head;
/** 用于删除无用的迭代器 */
private Node sweeper = null;
/***
* 这个标识删除探针
*/
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
/**初始化函数注册迭代器到迭代器集合里面
Itrs(Itr initial) {
register(initial);
}
/**
* 清理itrs 整理旧的过期的迭代器 所谓过期的迭代器,是被标识为none 或者是Detached就是被取走的
- 这个整理动作也是很有意思,普通是循环SHORT_SWEEP_PROBES次数,一旦发现有,那就会多循 环LONG_SWEEP_PROBES次数,尽力去寻找
/
void doSomeSweeping(boolean tryHarder) {
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
final Node sweeper = this.sweeper;
boolean passedGo; // to limit search to one full sweep
if (sweeper == null) {
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}
for (; probes > 0; probes--) {
if (p == null) {
if (passedGo)
break;
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
//这个条件就是发现需要被清理的迭代器
if (it == null || it.isDetached()) {
//这个就是更努力的去清理
probes = LONG_SWEEP_PROBES;
//下面是清理动作,然后指针后移
p.clear();
p.next = null;
if (o == null) {
head = next;
if (next == null) {
//这就是迭代器已经遍历完了,然后函数返回了
itrs = null;
return;
} }
else o.next = next;
} else o = p;
p = next;
}
//sweeper 是开始清理的节点位置
this.sweeper = (p == null) ? null : o;
}
/*- 注册逻辑的实现,在链表的最前面加元素
*/
void register(Itr itr) {
head = new Node(itr, head);
}
- 注册逻辑的实现,在链表的最前面加元素
/***
- 迭代器
/
private class Itr implements Iterator<E> {
/* 光标,是迭代器下一次迭代时的坐标,迭代器没有需要遍历的对象了,这个值会为负值/
private int cursor;
/* 下一个元素内容,调用Iterator.next方法拿到的值 /
private E nextItem;
/* 下一个元素的下标,none 是-1 被移除了是-2对应下面的static int /
private int nextIndex;
/* 上一个元素的内容 /
private E lastItem;
/* 上一个元素的的下标,none 是-1 被移除的是-2 同样对应下面的static int /
private int lastRet;
/* 记录之前的开始遍历的下标,当这个迭代器判定为失效了这个值就是DETACHED /
private int prevTakeIndex;
/ 记录之前循环次数的值,和Cycles进行比对,就知道有没有再循环过 /
private int prevCycles;
private static final int NONE = -1;
/元素被调用remove方法移走,的状态/
private static final int REMOVED = -2;
/** 分离分开的Special value for prevTakeIndex indicating "detached mode" /
private static final int DETACHED = -3;
/迭代器的初始化函数从takeIndex位置开始遍历/
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//队列里面没有值
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
//队列首元素后一个
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
//注册到itrs,所有迭代器的集合,顺序注册的
itrs.register(this);
// 清理无用的迭代器
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}
方法:(只介绍复杂性,有代表性的)
/**
堵塞提交,超时返回false
/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//获取锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//这里是使用同步队列器的超时机制,在nanos的时间范围内,方法会在这里堵塞,超过这个时间段nanos的值会被赋值为负数,方法继续,然后在下一个循环返回false。这个标志是未满标志,队列里面未满就可以放进元素嘛。然后判断成功就是一个入队列操作
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
/
- 入队列操作,因为putIndex已经是当前该放入元素的下标了,放入元素之后,
- 需要将putIndex+1,并且元素数量加1。然后直接调用非空标志通知等待中的消费者
- 质疑:如果我没有等待中的消费者,那也要通知,那不是浪费么?
- 解释:下端代码是signal的实现
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first)
}
signal方法已经在里面已经对队列的首元素判断空,不通知了,
这个引起我的一个思考,确实在函数里面就应该对这些条件做判断要比外面判断更好一些,一个是更健壮,一个是更友好,但是这个最小作用模块还是功能模块,别一个调用链做了多次的这种条件的判断,这就让阅读者难受了。
/
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/** - poll的操作和offer基本一样,就是做的是出队列的操作。还有就是一个drainTo方法也很类似,有一个细节有意思就是drainTo是
一个批量操作,但是通知却是一个一个通知的。没有调用singalAll()。因为堵塞队列强调一个顺序。一进一出原则。还有就是在外面判断了有无等待者。因为这*样却是省不必要的循环了。
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
/**
- 出队列操作,跟入队列操作正好是相反的,多了一个清理操作
/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//@key jdk1.8的新特性迭代器特性,这里是因为元素的出队列所以清理和这个元素相关联的迭代器
itrs.elementDequeued();
//对于生产者的通知
notFull.signal();
return x;
}
/*
- 根据下标移除元素,那么会分成两种情况一个是移除的是队首元素,一个是移除的是非队首元素,移除队首元素,就相当于出队列操作,移除非队首元素那么中间就有空位了,后面元素需要依次补上,然后如果是队尾元素,那么putIndex也就是插入操作的下标也就需要跟着移动。这里面同样有无用迭代器的清理和notFull标志的通知。elementDequeued 和removedAt 这两个函数差不多主要做的就是清理。但是不一样的是第一种情况当成出队列来处理了。而第二种就相当于这个元素就没有进过队列来处理,轻轻地来,轻轻地走不带走一片云彩
/
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//当移除的元素正好是队列首元素,就是take元素,正常的类似出队列的操作,
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//
} else {
//因为是队列中间的值被移除了,所有后面的元素都要挨个迁移
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = I;
break;
}
}
count—;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
/*- 当元素出队列的时候调用的方法这个出队列方法
/
void elementDequeued() {
// 在队列为空的时候调用清空所有的迭代器;
if (count == 0)
queueIsEmpty();
// 当拿元素进行循环的时候,清理所有过期的迭代器
else if (takeIndex == 0)
takeIndexWrapped();
}
}
/*
- 当元素出队列的时候调用的方法这个出队列方法
- 因为takeIndex等于0了,意味着开始下一个循环了.
- 然后通知所有的迭代器,删除无用的迭代器。
/
void takeIndexWrapped() {
//循环了一次cycle加1
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
//需要清理的条件,和清理代码
if (it == null || it.takeIndexWrapped()) {
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
//没有迭代器了,就关掉迭代器的集合
if (head == null) // no more iterators to track
itrs = null;
}
/*这个takeIndexWrapped 是内部类Itr 的方法跟上面不是一个类的方法
*这里就是判断这个迭代器所持有的元素还在队列里面么,那么有两个条件,1.isDetached() - 2.就是看这个的循环次数,比建立这个迭代器的时候的循环次数,如果大于1,说明发生过两次以上的循环
- 拿里面的元素都换了个遍,拿肯定是不对了,拿这个迭代器就被关闭了。
- @return true if this iterator should be unlinked from itrs
/
boolean takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {
// All the elements that existed at the time of the last
// operation are gone, so abandon further iteration.
shutdown();
return true;
}
return false;
}
//将所有的标志位都标记成remove ,null
void shutdown() {
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
if (lastRet >= 0) {
lastRet = REMOVED;
lastItem = null;
}
prevTakeIndex = DETACHED;
}
/** - 迭代器的基本方法之一,获取下一个元素,会发生缓存器失效的情况,如果是缓存器失效了,能重组就重组,即从takeIndex开始遍历,如果不行就标记失效, *返回none
- @return
*/
public E next() {
// assert lock.getHoldCount() == 0;
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//当判定该迭代器失效了,会重组迭代器,以takeIndex为起点开始遍历,或者标记失效
if (!isDetached())
incorporateDequeues();
lastRet = nextIndex;
final int cursor = this.cursor;
//cursor这个值会在incorporateDequeues方法中修改,
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
/**
- 发现元素发生移动,通过判定cycle等信息,然后cursor取值游标就重新从takeIndex开始
- 下面如果发现所有记录标志的值发生变化,就直接清理本迭代器了。
- */
private void incorporateDequeues() {
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// 从本迭代器建立开始,到目前堵塞队列出队列的个数,也就是takeIndex的偏移量
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// 判断所记录的last,next cursor 还是不是原值如果不是,这个迭代器就判定detach
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
//重新记录cycle值
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
回顾一下;
我介绍了ArrayblockingQueue其实是包含了两个部分一个是标准阻塞队列接口的实现。另一个是jdk1.8增加的迭代器。上一个满大街博客都能找的到,我就把接口描述了一下,然后介绍了两个还算是复杂一点的接口。和整个一个工作原理,没有太多使用case。主要是就是生产者和消费者模型。一个锁应用,和其他的JUC框架不一样。它什么操作都加锁,并发变串行。所以它就没有用到原子类修饰的共享变量。
关于迭代器部分好像是只有我这里有写。如果有百度上有看到相关ArrayBlockingQueue迭代器文章的请留言。毕竟我一家之言,还是有可能会有理解上的偏差。我们总结一下这个迭代器。首先跟别的设计一样,谁用谁new。这个不一样的是会增加一个注册到堵塞队列对象里面itrs上面。然后呢用了一个软引用,那么就GC可以回收避免内存溢出。然后会有对无用的迭代器的清理,类似于threadLocal那样。那么什么是无用的迭代器呢。标识无用就一个条件,我的迭代器标识的结点被覆盖了,因为它空间就这么大,举个例子一个大小5的堵塞队列。然后我建了一个迭代器,那么这个迭代器的下标就是0.然后迭代器我没有马上用,然后进出队列10次,那么之前节点的值已经被替换了。队列里面还有值,但是迭代器的值已经在take方法中被干掉了,已经失效了。判断条件就是cycle的循环次数。有兴趣可以好好了解一下,这应该是我看过的最复杂的迭代器了。
留一些问题:
1.这个迭代器为什么会比arrayList复杂这么多?
2.其实作为堵塞队列来说无非就是数据交换,拿有什么场景是需要迭代器的?而且本身就全都锁控制,效率就不高。还加入这么复杂的迭代模块。会更慢一些的?
这篇文章会看起来比较碎。尽力了。。没有整块的时间去写。而且没想这个迭代器这么复杂。花费我很多时间去研究(没错,这就是我脱稿的原因)
还有就是风格和上一篇不一样了。我希望可以让看这篇文章的人不光是可以学习到之前不知道的知识。也可以触发大家更多的去主动的思考,去思考模块的设计,功能的实现。而不是被动接受这篇文章所传递出来的内容。
还有就是看这种源码。一定要先框架,功能。摸透再去看细节。如果你对这个代码块所要完成的功能不够了解。拿看起来费劲。框架,功能这些都摸透了。再钻到细节上面去。我们可能用到的框架很多,拿要读的源代码那就太多了。其实阅读源代码我觉得是培养一个阅读代码的能力。一个是学习处理这种场景的解决方案,一个是学习编程风格,编码模式。还有就是可能会培养对编程、对探究的兴趣。毕竟工作不能只是为了赚钱。