RabbitMQ 官方demo实践笔记

最近在上云计算,上课讲到RabbitMQ,先试下水(重庆大学2017 刘 李 杨 谢)

Demo 0.环境配置:

  • 服务端(MQ服务)

下载安装erlang 22.3 ,下载安装 RabbitMQ 3.8,无需进一步操作

  • 客户端(代码)

本小组使用Java语言,基于Maven配置:

pom.xml

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>

此时Maven自动导入包,可以在项目中使用MQ了

Demo1.实现Hello World

  • 发送方,消息生产者:

核心代码

    //step 1
    ConnectionFactory factory = new ConnectionFactory();  
    //step 2
    factory.setHost("localhost");   
    //step 3
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {   
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //step  4
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    }

核心逻辑

  1. 工厂模式产生一个channel作为通讯的工具
  2. 为其绑定主机号,localhost表示本地
  3. 制造一个channel,并在其中声明一个名叫 QUEUE_NAME 字符串值得队列
  4. 发布这个消息,这个消息位于消息队列中了
  • 接收方,消息的消费者:

核心代码

    //step 1
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //step 2
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //step 3
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
     };
    //step 4
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

核心逻辑

  1. 同发送方,创建工厂和channel;
  2. channel声明队列,这个和发送方代码调用相同,但此时是找到已经创建的MQ而发送方是创建一个MQ
  3. 创建一个接收消息后的逻辑的回调,但是不在这里执行;
  4. 通过basicConsume循环接收消息,执行回调逻辑处理消息

Demo 2. 公平分摊,ACK,持久化

基于demo1的代码,产生如下问题:

  • 消息存在内存中,如果停止服务,重启丢失怎么办?

  • 如果有多个消费者,那么如何决定将消息发给谁?

  • 如何确保消费者处理完消息了呢? && 如何确保公平分摊?

demo2实现的工作队列将用于解决这些问题,具体如下

Q1:持久化问题:

一般情况下,发布消息:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("",QUEUE_NAME, 
                     null,
                     message.getBytes("UTF-8"));

如果需要将消息持久化:

channel.queueDeclare(TASK_QUEUE_NAME,
                     true, false, false, null);//第二个参数为是否持久化
channel.basicPublish("", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
Q2.多消费者问题

如果有多个消费者:


image.png

默认情况下,会按一定的顺序轮流分配给不同的消费者;也就是说,多个消费者轮流消费队列里的消息

Q3. 确保处理完成&&公平分配

那么,每个消费者会设置一个channel.basicQos(n); 代表消费者能最大n个队列待处理;如果超过这个值,那么消息队列将不会往其中分配;

而MQ是不会主动监视消费者还有多少个消息没处理,通常,都是由消费者处理完了主动通知MQ:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

消费者的这段代码告诉队列已经成功处理了消息,如果队列持久化了消息,这个时候就可以放心销毁了;否则消息会一直存在,那么也会造成开销甚至存储溢出;

Demo 3.exchange "fanout" 实现发布订阅

当发送方发送一条消息,订阅了该类型消息的所有消费者都会收到。

场景:

​ 用户上传了自己的头像,这个时候图片需要清除缓存,同时用户应该得到积分奖励,你可以把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片需要处理的需求的时候,原来的代码可以不变,只需要添加一个订阅消息即可,这样发送方和消费者的代码完全解耦,并可以轻而易举的添加新功能了。

​ 原理在于,发送方不再直接通过消息队列发送消息,而是通过exchange,而消费者会通过在所在的exchange绑定自己的消息队列;当发送方在exchange发布的时候,exchange会把消息发送给所有的绑定在其上的消息队列发送消息。(下图X为exchange)


image.png
  • 发送方:

核心代码

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, 
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");

核心逻辑

相比之前的demo,发送方不再通过channel.queueDeclare()声明一个queue,取而代之的是声明一个exchange,fannout是广播的模式,还有其他类型:direct topic等:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  //"fanout"模式代表广播发送
  • 接收方:

核心代码

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        //...
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Consumer Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    

核心逻辑

同样,接收方也通过exchangedeclare连接到exchange;这个时候通过channel会为其分配一个随机命名的MQ,并通过queueBind()绑定到exchange,该MQ为了在最后一行basicConsume()中获取的这个MQ执行回调函数,通过getQueue() 获取到该随机队列名,随后正常消费队列消息

Demo 4. exchange "direct" 选择性发布订阅

对于demo3, exchange的fanout将发送方每一个消息都发送给所有的消费者,这是一种无差别的分配,但有的时候是需要合理过滤的:

场景:
​ 一个日志产生器会产生日志,而多个消费者会消费日志,但是有的消费者只会对【warn】级别的感兴趣,而有的只负责处理【info】【error】级别的日志;这个情况下,如果使用fanout,消费者就会收到自己不感兴趣的消息,exchange对这些消费者不感兴趣的MQ根本没必要发送消息。


image.png
  • 发送方:发送 info warn error三种类型的日志

核心代码:

            //前面照常创建channel、 factory 
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            Scanner sc=new Scanner(System.in);
            while(sc.hasNext()){
                String severity = sc.nextLine();   //日志级别
                String message = severity+" message";

                channel.basicPublish(EXCHANGE_NAME,
                        severity,                  //发往对此级别感兴趣的管道
                        null, message.getBytes("UTF-8"));
            }
        }

核心逻辑:

  1. 声明exchange的时候指定类型"direct"
  2. 输入感兴趣的级别 warn error info等
  3. 发往对应感兴趣的MQ


    image.png
  • 接收方:选择性接收自己感兴趣的log类型

核心代码

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        String [] interestLog=new String[]{"info"};//对info类型感兴趣
        for (String severity : interestLog) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

核心逻辑

  1. 同样声明,找到同名exchange,类型也是“direct”

  2. 根据感兴趣类型,绑定对应的队列(多个感兴趣事件就绑定多个MQ)

  3. 设置future回调函数处理感兴趣的事件:

对info感兴趣的消费者,只能收到Info:

image.png

对 error,warn感兴趣的消费者,只能收到error 和 warn

image.png

Demo. 5 exchange "Topic"更灵活的匹配发布订阅

​ 尽管使用direct类型的exchange对fanout进行了改进,但它仍然存在局限性:它不能基于多个条件进行路由。

场景:

​ 日志系统并不只会产生[warn info error]等级信息,还会涉及作者[user admin guest],以及别的信息,这多个属性往往是不相干的,如果用direct,会有很大的限制

Topic 类型的exchange使用 xx.xx.xx.xx的点段式路由key对管道进行划分,发布的消息的key为该格式的,而消费者则只需要care其中某几个自己感兴趣的字段,别的通过*****进行忽略某一段,或者使用#进行模糊匹配:

例如发布了一个“cn.mq.rabbit.error”的消息;

  • 能匹配上的路由键:cn.mq.rabbit.* ; cn.mq.rabbit.#;#.error ; cn.mq.#;#

  • 不能匹配上的路由键:cn.mq..error;*

原理图如下,可以看到一个消费者可以匹配多个topic:


image.png

代码上:

  • 发送方

和DEMO4 基本结构相同,只是对exchange声明为“topic”类型:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

此外就是发布的时候routingKey变成了点段式字符串:

channel.basicPublish(EXCHANGE_NAME,
            routingKey, //如“a.b.c”    “error.admin.2020”
            null, Rmessage.getBytes("UTF-8"));              

接收方发布消息:


image.png
  • 接收方

接收方同样相比demo4只有两处改动:

topic类型的exchange声明

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

匹配的bindKey变成了点段式字符串

        String [] keys=new String[]{"*.admin.*"  ,  "#.guest.#"};
        for (String bindingKey : keys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

对于上述发送方的输入: 对第二个字段为“admin”和“guset”感兴趣的接收方收到:

image.png

对第一个字段为“Error”感兴趣的接收方收到:

image.png

Topic实现了相比direct更灵活的匹配选择性订阅;

Demo .6 RPC远程调用

RPC远程调用是MQ的一个用法示例,其基本原理还是在于字符串消息的传递,在官方示例中,请求服务方只负责通过RequestQueue简单发送一个数字字符串;

而RPC服务方收到RequestQueue字符串之后进行斐波拉契数列的调用,随后通过ReplyQueue返回结果;

image.png
  • 如何精准返回?

我们知道,通常服务端只有几个实例,负责响应所有的调用请求,客户端数量远远大于服务端;所以,通常都是多个客户端用一个requestQueue,而自己和服务端之间独享一个replyQueue;

所以客户端在发送请求的时候,会通过设置property中的属性来告诉服务方将返回结果送往自己所在的replyQueue:

客户端

        String replyQueueName =
            channel.queueDeclare().getQueue();//和服务方建立一个随机名字的MQ
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)//告诉服务方返回结果的队列
                .build();

同时我们也看到通过在prop中设置了一个随机的id,可以在接收的时候验证,确保返回来的结果就是当初发出去的:

        String ctag = channel.basicConsume(replyQueueName
                , true, (consumerTag, delivery) -> {
            //equals函数确保发送的和接收的结果的id相同
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                //如果reply有返回值了,那么就往response阻塞队列放置
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }java
        }, consumerTag -> {
        });
        String result = response.take();//阻塞获取RPC结果

通过将response初始化为阻塞队列,当结果没有返回的时候,就会阻塞。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343