前言
最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。
什么是阻塞队列?
A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
简而言之就是当队列满时,插入阻塞;当队列为空时,删除(取出)阻塞。常用于生产者和消费者场景。
自己实现一个简单的阻塞队列,实现原理就是通知模式,当队列满时,添加元素阻塞,当队列空时,删除阻塞
/**
* 意思就是自定义一个队列,当队列中的元素个数等于指定的长度时,put方法要想再加入元素,必须等待take从集合取出元素之后才能放入,这边涉及到wait和notify,
* 同理,take的时候如果队列中没有元素(此时的元素集合长度为0,那么也要执行等待,等到put进元素才能取出,这边也涉及到wait和notify)
*
*/
public class MyQueue {
//1 需要一个承装元素的集合
private LinkedList<Object> list = new LinkedList<>();
//2 需要一个计数器
private AtomicInteger count = new AtomicInteger(0);
//3 需要制定上限和下限
private final int minSize = 0;
private final int maxSize ;
//4 构造方法
public MyQueue(int size){
this.maxSize = size;
}
//5 初始化一个对象 用于加锁
private final Object lock = new Object();
//put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
public void put(Object obj){
synchronized (lock) {
while(count.get() == this.maxSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 加入元素
list.add(obj);
//2 计数器累加
count.incrementAndGet();
//3 通知另外一个线程(唤醒)
lock.notify();
System.out.println("新加入的元素为:" + obj);
}
}
//take: 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
public Object take(){
Object ret = null;
synchronized (lock) {
while(count.get() == this.minSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 做移除元素操作
ret = list.removeFirst();
//2 计数器递减
count.decrementAndGet();
//3 唤醒另外一个线程
lock.notify();
}
return ret;
}
public int getSize(){
return this.count.get();
}
public static void main(String[] args) {
//线程run中使用变量必须使用final修饰
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前容器的长度:" + mq.getSize());
Thread t1 = new Thread(() -> {
mq.put("f");
mq.put("g");
},"t1");
t1.start();
Thread t2 = new Thread(() -> {
Object o1 = mq.take();
System.out.println("移除的元素为:" + o1);
Object o2 = mq.take();
System.out.println("移除的元素为:" + o2);
},"t2");
try {
//使用此单元执行 Thread.sleep.这是将时间参数转换为 Thread.sleep 方法所需格式的便捷方法。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
jdk并发包提供的BlockingQueue及其实现
然后看其比较主要的实现
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没有实现读写分离,长度是需要定义的,按照先进先出(FIFO)的原则对元素进行排序。是有界队列(bounded),在很多场合非常适合使用。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列。
可以使用构造参数实现公平的阻塞队列
/**
* ArrayBlockingQueue是有界的阻塞队列,如果插入队列中的元素多余自己定义的长度会抛出InterruptedException异常
*
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws Exception{
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
// array.add("e");
// array.add("f");
//offer方法表示将元素加到队列中的最后,第二个参数和第三个参数表示如果队列满的话,等待时间后还是满的话插入失败
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
System.out.println(array);
}
}
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
链表队列通常比数组队列有更好的吞吐量,但是在大多数并发应用中可预见性低。
/**
* 阻塞队列,不指定队列长度,底层是由链表实现的,LinkedBlockingDeque之所以能够高效的处理并发数据,
* 是因为其内部实现采用分离锁(读写分离两个锁),是一个无界队列
*/
public class LinkedBlockingQueueTest {
public static void main(String[] args) {
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<>();
//offer将元素添加到队列的末尾,如果队列不满,添加成功返回true,队列full则返回false
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
for(Iterator<String> iterator = q.iterator();iterator.hasNext();){
String value = iterator.next();
System.out.println(value);
}
System.out.println(".............................");
q.stream().forEach(str -> System.out.println(str));
System.out.println("..............................");
List<String> list = new ArrayList<>();
//javadoc没有这方法的说明,根据返回的结果很明显了,调用drainTo方法返回的是向list集合中添加的元素
System.out.println(q.drainTo(list, 3));
System.out.println(list.size());
for (String string : list) {
System.out.println(string);
}
}
}
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界(unbounded)阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
//take方法按照规定的优先级顺序取出数据
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<String> q = new PriorityBlockingQueue<>();
q.add("world");
q.add("welcome");
q.add("aaaaaa");
q.add("hello");
q.stream().forEach(str -> System.out.println(str));
PriorityBlockingQueue<Student> studentPriorityBlockingQueue = new PriorityBlockingQueue<>();
Student student = new Student(1,"miaozhihao");
Student student1 = new Student(2,"zob");
Student student2 = new Student(3,"aaaa");
Student student3 = new Student(2,"tom");
Student student4 = new Student(2,"bbbb");
studentPriorityBlockingQueue.add(student);
studentPriorityBlockingQueue.add(student1);
studentPriorityBlockingQueue.add(student2);
studentPriorityBlockingQueue.add(student3);
studentPriorityBlockingQueue.add(student4);
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
}
}
class Student implements Comparable<Student>{
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Student(int id,String name){
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Student{" + "id=" + id + ", name='" + name + '\'' + '}';
}
@Override
public int compareTo(Student student) {
return this.id - student.id != 0 ? (this.id - student.id) : (this.name.compareToIgnoreCase(student.name));
}
}
DelayQueue
带有延迟时间的Queue,其中的元素只有当前指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个无界队列,应用场景很多
·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。
/**
* 局限,当队列中的head元素时间到了才能取下一个元素,如果第一个元素延迟时间最长,那么等待时间到了之后,元素中的元素一起取出来了
* 所以,我的demo daqiu1 设置的延迟时间是最短的
*
*/
public class DelayQueueTest {
public static void main(String[] args) throws Exception{
Daqiu daqiu1 = new Daqiu("miaozhihao",1 * 1000 +System.currentTimeMillis());
Daqiu daqiu2 = new Daqiu("zhangsan",4 * 1000 +System.currentTimeMillis());
Daqiu daqiu3 = new Daqiu("lisi",2 * 1000 +System.currentTimeMillis());
Daqiu daqiu4 = new Daqiu("wangwu",13 * 1000 +System.currentTimeMillis());
final DelayQueue<Daqiu> daqius = new DelayQueue<>();
Thread th1 = new Thread(() -> {
while(true){
try {
Daqiu daqiu = daqius.take();
System.out.println(daqiu);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
th1.start();
daqius.add(daqiu1);
daqius.add(daqiu2);
daqius.add(daqiu3);
daqius.add(daqiu4);
}
}
//加入到DelayQueue的方法必须要实现Delayed接口,而Delayed继承Comparable接口
class Daqiu implements Delayed{
//打球的人
private String name;
//截止时间
private long endTime;
//定义时间工具类
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Daqiu(String name,long endTime){
this.name = name;
this.endTime = endTime;
}
@Override
public String toString() {
return "打球的人为:"+this.name;
}
//使用给定的时间单位返回与当前对象关联的剩余时间
@Override
public long getDelay(TimeUnit unit) {
return endTime - System.currentTimeMillis();
}
//compareTo是根据getDelay方法提供指定的顺序
@Override
public int compareTo(Delayed delayed) {
Daqiu daqiu = (Daqiu)delayed;
return this.getDelay(this.timeUnit) - daqiu.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
SynchronousQueue
A BlockingQueue blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
总结一下:
不存储元素的阻塞队列。每一个插入(put)操作必须等待一个删除(take)操作,否则不能继续添加元素。阻塞表现在你不能peek一个synchronous队列除非存在另一个线程存在并且去删除这个队列,不能去插入一个元素除非另外一个线程去删除它。只有另外一个线程在删除元素的时候才能遍历。
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
/**
* take方法睡眠3s,那么insert操作就阻塞3s等待去take的时候才能去消费它
*/
public class SynchronousQueueTest {
public static void main(String[] args) throws Exception{
final SynchronousQueue<String> q = new SynchronousQueue<>();
Thread t1 = new Thread(() -> {
try {
String value = q.take();
Thread.sleep(3000);
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread t2 = new Thread(() -> q.add("hello"));
t2.start();
}
}
其实看了源码也是使用LockSupport工具类的加锁方法实现阻塞的。
LinkedTransferQueue
其javadoc:
An unbounded {@link TransferQueue} based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.
Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.
总结一下:
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。先进先出。head元素是队列中最长的生产者,tail是队列中最短时间的生产者。
不同于其他的集合,size方法不是恒定不变的,因为队列是异步的,确定当前元素的数目需要去遍历该元素,在遍历的时候修改集合会对结果产生影响。
相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
/**
* tryTransfer与transfer的区别就是一个有返回值,前者有线程等待则返回true并且将参数(元素)直接推送给消费端,没有消费者则直接返回false
* 后者不返回这个标志
*/
public class LinkedTransferQueueTest {
public static void main(String[] args) throws Exception{
final LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
//queue.transfer("bbb");
new Thread(() -> {
try {
System.out.println("--------");
String value = queue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
//tryTransfer将元素立即传递给消费者,如果当前有消费者等待则返回true,如果没有则返回false
boolean flag = queue.tryTransfer("aaa");
System.out.println(flag);
new Thread(() -> {
System.out.println("---------");
try {
String value = queue.poll(4, TimeUnit.SECONDS);
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(3000);
//Thread.sleep(10000);
queue.transfer("bbb");
System.out.println("-----end-------");
}
}
LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。
//由链表结构组成的双向阻塞队列
public class LinkedBlockingDequeTest {
public static void main(String[] args) {
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("a");
deque.addFirst("b");
deque.addFirst("c");
deque.addLast("e");
deque.addFirst("f");
System.out.println(deque);
System.out.println(deque.getLast());
System.out.println(deque.peekLast());
}
}
以上BlockingQueue的实现。