08.并发编程之Queue介绍

一、技术选型

MQ适用于消息堆积,消费端处理不过来;两点之间运行生命周期不同情况;
JMS支持5种消息类型,为什么不能用队列来替代呢?
原因:解耦、针对消息本身做队列操作、消息持久化
PS:如果消费很快,前端积压严重的话,不建议使用mq,可采用直连的TCP通信,如mina、netty,节省中间mq转发。

二、常用语法

add 增加一个元索,如果队列已满,则抛出一IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
element 返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true,如果队列已满,则返回false
poll 移除并返问队列头部的元素,如果队列为空,则返回null
peek 返回队列头部的元素,如果队列为空,则返回null
put 添加一个元素,如果队列满,则阻塞
take 移除并返回队列头部的元素,如果队列为空,则阻塞
removeelementofferpollpeek 其实是属于Queue接口。
阻塞队列的操作可以根据它们的响应方式分为以下三类:
(1) addremoveelement操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。
(2) 在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offerpollpeek方法。这些方法在无法完成任务时只是给出一个出错示而不会抛出异常。
注意:pollpeek方法出错进返回null。因此,向队列中插入null值是不合法的。
(3) put:把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断,直接有空间再继续。take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。

三、并发Queue

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,都继承自Queue。

1、ConcurrentLinkedQueue

是一个适用高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,性能好于BlockingQueue,是一个基于链接节点的无界线程安全队列。
该队列的元素先进先出,头是先进的,尾是最近加入的,队列不允许为Null。
重要方法:
add()offer()都是加入元素的方法,无任何区别。
poll()peek()都是取头元素节点,前者会删除元素,后者不会。

2、ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护一个定长数组,以便缓存队列中的数据对象,内部没有实现读写分离,意味着生产者和消费者不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列。

/**有界队列**/
package demo3;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class UserQueue {
    public static void main(String[] args) throws InterruptedException {

        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);  // 必须传一个长度

        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
        array.add("f");
        //System.out.println(array.offer("a", 3, TimeUnit.SECONDS));// 阻塞式
    }
}

3、LinkedBlockingQueue

基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列是一个链表构成),LinkedBlockingQueue之所以能够高效地处理并发数据,是因为内部实现采用了读写分离锁,从而实现生产者和消费者操作的完全并行,是一个无界队列。

/**无界队列**/
        LinkedBlockingQueue<String> link = new LinkedBlockingQueue<String>();   // 可以不传,也可以传,传则定死;不传则无界

        link.put("a");
        link.put("b");
        link.add("c");
        link.add("d");
        link.add("e");
        link.add("f");
        for (Iterator iterator = link.iterator(); iterator.hasNext();) {
            String string = (String) iterator.next();
            System.out.println(string);
        }
        //但如果初始化长度的话,也会有容量限制

4、SynchronousQueue

一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费。

/**无缓存队列**/
        SynchronousQueue<String> queue = new SynchronousQueue<String>();
        queue.add("a");
/**由于没有任何容量,直接抛异常**/
        Exception in thread "main" java.lang.IllegalStateException: Queue full
        at java.util.AbstractQueue.add(AbstractQueue.java:98)
        at demo3.UserQueue.main(UserQueue.java:31)
/**可以调用add方法,但并不代表队列中加元素了,先take再add,直接扔给前面阻塞在take的线程**/
final SynchronousQueue<String> q = new SynchronousQueue<String>();
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    System.out.println(q.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                q.add("adsfa");

            }

        });
        t2.start();

5、PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator对象来决定,也就是说传入队列的对象必须实现Comparable接口),不遵循先进先出,在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,它是一个无界队列。
不是加一个元素时就进行排序,而是调用take/poll方法时才进行将优先级最高的拿出来。

package demo3;

public class Task implements Comparable<Task> {
    private int     id;
    /**
     * @return the id
     */
    public int getId() {
        return id;
    }

    /**
     * @param id the id to set
     */
    public void setId(int id) {
        this.id = id;
    }

    /**
     * @return the name
     */
    public String getName() {
        return name;
    }

    /**
     * @param name the name to set
     */
    public void setName(String name) {
        this.name = name;
    }

    private String  name;

    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
    }

    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "Task [id=" + id + ", name=" + name + "]";
    }
    
}
package demo3;

import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;

public class UsePriorityBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
        
        Task t1 = new Task();
        t1.setId(3);
        t1.setName("任务1");
        Task t2 = new Task();
        t2.setId(6);
        t2.setName("任务2");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("任务3");
        
        q.add(t1);
        q.add(t2);
        q.add(t3);
        System.out.println(q);
        for (Iterator iterator = q.iterator();iterator.hasNext();){
            Task task = (Task)iterator.next();
            System.out.println(task.getName());
        }
        
        System.out.println(q.poll());
        System.out.println(q.poll());
        System.out.println(q.poll());
    }
}

6、 DelayQueue

带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delay接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等。
未超时肯定会阻塞。

四、场景

应用非常繁忙,并发量非常大,应用承载量1000个任务,单核,一个线程,类似于马路上的早晚高峰,采用ArrayBlockingQueue有界队列,做容量内存限制,超过的话,给予相应的拒绝措施。
平稳期,采用无界队列,车流量不是很大,可以放心使用LinkedBlockingQueue,能保证数据的实时性。
夜半时分,不需要存储到队列中了(浪费空间),数据量非常少,采用虚拟的队列(无容量)SynchronousQueue队列,直接提交给线程,效率更高。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容