中间件 | RabbitMQ

本文章是在网易云课堂的课程学习中编写,部分图片从网易云课堂ppt引用

一、RabbitMQ简介

是一个开源的AMQP实现

二、RabbitMQ安装运行

1、安装依赖环境

  1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本

  2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。

  3. 复制下载地址后,使用wget命令下载

    wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  4. 安装 Erlang

    sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  5. 安装 socat

    sudo yum install -y socat
    

2、安装RabbitMQ

  1. 官方下载页面找到CentOS7版本的下载链接,下载rpm安装包

    wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

    提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

  2. 安装RabbitMQ

    sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

3、启动和关闭

  • 启动服务
    sudo systemctl start rabbitmq-server
    

若启动报错,可查看日志信息

  • 查看状态

    sudo systemctl status rabbitmq-server
    
  • 停止服务

    sudo systemctl stop rabbitmq-server
    
  • 设置开机启动

    sudo systemctl enable rabbitmq-server
    

4、RabbitMQ基本配置

RabbitMQ有一套默认的配置,一般能满足日常开发需求。若需要修改,需要自己创建一个配置文件

touch /etc/rabbitmq/rabbitmq.conf

官网配置项说明:
https://www.rabbitmq.com/configure.html

5、RabbitMQ管理界面

RabbitMQ安装包中带有管理插件,但要手动激活

  1. 开启插件

    rabbitmq-plugins enable rabbitmq_management
    

    说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

  2. 添加用户

    rabbitmqctl add_user admin admin
    
  3. 为用户分配操作权限

    rabbitmqctl set_user_tags admin administrator
    

在一个RabbitMQ中,可以划分多个虚拟主机

  1. 为用户分配资源权限
    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    

配置完毕后,可以在浏览器打开 15672 端口的控制台页面,账号密码是 admin/admin

6、RabbitMQ端口

RabbitMQ会绑定一些端口,安装完后并启动服务后,还不能进行外部通信,需要将这些端口添加至防火墙。

  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

下面对一些端口做一下介绍:

  • 4369
    是Erlang的端口/结点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作用。

  • 5672,5671
    一般使用的是5672端口,没有使用SSL。使用SSL的话,是用5671端口。

  • 25672
    可理解为是用于管理的端口,用于RabbitMQ节点间和CLI工具通信,配合4369使用。

  • 15672
    HTTP管理端口,通过这个端口打开web可视化的管理页面,用于管理RabbitMQ,需要启用management插件。

  • 61613, 61614
    插件相关的端口,当STOMP插件启用的时候打开,作为STOMP客户端端口(根据是否使用TLS选择)

  • 1883, 8883
    插件相关的端口,当MQTT插件启用的时候打开,作为MQTT客户端端口(根据是否使用TLS选择) 。默认使用的是1883

  • 15674
    基于WebSocket的STOMP客户端端口(当插件Web STOMP启用的时候打开)

  • 15675
    基于WebSocket的MQTT客户端端口(当插件Web MQTT启用的时候打开)

7、RabbitMQ角色

  • none
    不能访问management插件

  • management
    查看自己的virtual hosts中的queues、exchanges、bindings等资源

  • policymaker
    比management角色多了些功能,专门用来管理相关的策略。比如查看、创建和删除自己的virtual hosts所属的policies和parameters

  • monitoring
    比management角色多了些功能,主要用来监控。可查看所有virtual hosts,其他用户的connections、channels,节点级别的数据(比如clustering、memory情况)等

  • administrator
    权限最大的角色

三、RabbitMQ的简单使用

1、maven依赖

  • 在Java中使用RabbitMQ
<dependencies>
       <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>5.5.1</version>
       </dependency>
   </dependencies>
  • 在Spring中使用RabbitMQ
<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、队列生产者

public class Producer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setPort(5672);
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("生产者");

           // 4、从链接中创建通道。一个连接可以创建多个channel
           channel = connection.createChannel();

           /**
            * 5、声明(创建)队列
            * 如果队列不存在,才会创建
            * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
            *
            * queueDeclare参数说明:
            * @param queue 队列名称
            * @param durable 队列是否持久化
            * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
            * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
            * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
            */
           channel.queueDeclare("queue1", false, false, false, null);

           // 消息内容
           String message = "Hello World!";
           // 6、发送消息
           channel.basicPublish("", "queue1", null, message.getBytes());
           System.out.println("消息已发送!");

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 7、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 8、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

3、队列消费者

public class Consumer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("消费者");

           // 4、从链接中创建通道
           channel = connection.createChannel();

           /**
            * 5、声明(创建)队列
            * 如果队列不存在,才会创建
            * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
            *
            * queueDeclare参数说明:
            * @param queue 队列名称
            * @param durable 队列是否持久化
            * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
            *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
            *                  一般在队列和交换器绑定时使用
            * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
            * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
            */
           channel.queueDeclare("queue1", false, false, false, null);

           // 6、定义收到消息后的回调
           DeliverCallback callback = new DeliverCallback() {
               public void handle(String consumerTag, Delivery message) throws IOException {
                   System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
               }
           };
           // 7、监听队列
           channel.basicConsume("queue1", true, callback, new CancelCallback() {
               public void handle(String consumerTag) throws IOException {
               }
           });

           System.out.println("开始接收消息");
           System.in.read();

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 8、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 9、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

四、AMQP协议

AMQP ( Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。可基于实现AMQP,来实现消息中间件。

1、AMQP结构

AMQP结构.png

2、流转过程

AMQP生产者流转过程

connection - channel - publish - close

AMQP生产者流转过程.png

AMQP消费者流转过程

AMQP消费者流转过程.png

五、RabbitMQ核心概念

核心概念

  • Producer:生产者,创建消息发布到RabbitMQ中
  • Consumer:消费者,获取消息体
  • Broker:消息中间件的服务节点
  • 虚拟主机:每个broker可定义多个虚拟主机,像mysql-server可以定义多个db。RabbitMQ默认的vhost(虚拟主机)是 /
  • connection:连接,一个connection可以创建任意个channel
  • channel:建立在connection上的通道
  • queue:队列,用于存储消息
  • RoutingKey:路由键,需要与交换类型和绑定键(BindingKey)结合使用。生产者发送消息给交换器时,会指定一个RoutingKey,指定路由规则
  • Binding:绑定,将交换器与队列关联起来
  • exchange:交换器,将生产者发来的消息路由到一个或多个队列。若路由不到,则根据生产者的属性配置,返回给生产者或直接丢弃。
    exchange有四种模式,fanout、direct、topic、headers模式,若不指定交换机,则使用默认交换机,根据消息中指定的 queue 的名称,匹配到对应的queue。
    fanout模式:绑定了的所有queue都会收到消息;
    direct模式:将消息路由到BindingKey和Routing Key完全匹配的队列;
    topic模式:与direct类似,但可通过通配符进行模糊匹配。* 代表一个单词,# 代表多个单词
    headers模式:根据消息中的 header 属性匹配。
image.png

整体运转流程:


image.png

使用 exchange
1、生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符

           // 路由关系如下:com.# --> queue-1     *.order.* ---> queue-2
           // 消息内容
           String message = "Hello";
           // 发送消息到topic_test交换器上
           channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
           System.out.println("消息 " + message + " 已发送!");

2、消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息

final String queueName = Thread.currentThread().getName();

           try {
               // 3、从连接工厂获取连接
               connection = factory.newConnection("消费者");

               // 4、从链接中创建通道
               channel = connection.createChannel();
               // 定义消息接收回调对象
               DeliverCallback callback = new DeliverCallback() {
                   public void handle(String consumerTag, Delivery message) throws IOException {
                       System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                   }
               };
               // 监听队列
               channel.basicConsume(queueName, true, callback, new CancelCallback() {
                   public void handle(String consumerTag) throws IOException {
                   }
               });

               System.out.println(queueName + " 开始接收消息");
               System.in.read();

3、使用 fanout 型交换器实现 发布订阅模式

启动Consumer 类会开启两个消费者,Producer 类运行后,两个消费者都能接收到消息

【注意】利用临时队列,可随时添加一个queue,且不会互相影响。比如可以启多个消费者服务,则exchange可以绑定多个临时队列,从而收到发往exchange的消息。即发布-订阅思想

1)消费者:通过一个临时队列和交换器绑定,接收发送到交换器上的消息

public class Consumer {

   private static Runnable receive = new Runnable() {
       public void run() {
           // 1、创建连接工厂
           ConnectionFactory factory = new ConnectionFactory();
           // 2、设置连接属性
           factory.setHost("192.168.100.242");
           factory.setUsername("admin");
           factory.setPassword("admin");

           Connection connection = null;
           Channel channel = null;
           final String clientName = Thread.currentThread().getName();

           try {
               // 3、从连接工厂获取连接
               connection = factory.newConnection("消费者");

               // 4、从链接中创建通道
               channel = connection.createChannel();

               // 代码定义交换器,不管是生产者或消费者都可以定义
               channel.exchangeDeclare("ps_test", "fanout");
               //  还可以定义一个临时队列,连接关闭后会自动删除,此队列是一个排他队列
               String queueName = channel.queueDeclare().getQueue();
               // 将队列和交换器绑定
               channel.queueBind(queueName, "ps_test", "");

               // 定义消息接收回调对象
               DeliverCallback callback = new DeliverCallback() {
                   public void handle(String consumerTag, Delivery message) throws IOException {
                       System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                   }
               };
               // 监听队列
               channel.basicConsume(queueName, true, callback, new CancelCallback() {
                   public void handle(String consumerTag) throws IOException {
                   }
               });

               System.out.println(clientName + " 开始接收消息");
               System.in.read();

           } catch (IOException e) {
               e.printStackTrace();
           } catch (TimeoutException e) {
               e.printStackTrace();
           } finally {
               // 8、关闭通道
               if (channel != null && channel.isOpen()) {
                   try {
                       channel.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   } catch (TimeoutException e) {
                       e.printStackTrace();
                   }
               }

               // 9、关闭连接
               if (connection != null && connection.isOpen()) {
                   try {
                       connection.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
               }
           }
       }
   };

   public static void main(String[] args) {
       new Thread(receive, "c1").start();
   }

}

2)生产者:将消息发送到fanout类型的交换器上

public class Producer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("生产者");

           // 4、从链接中创建通道
           channel = connection.createChannel();

           // 定义fanout类型的交换器
           channel.exchangeDeclare("ps_test", "fanout");

           // 消息内容
           String message = "Hello Publish";
           // 发送消息到ps_test交换器上
           channel.basicPublish("ps_test", "", null, message.getBytes());
           System.out.println("消息 " + message + " 已发送!");

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 7、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 8、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

4、实际使用中,若往队列queue1中发送多条消息,queue1中堆积了大量消息,要如何加入消息的处理?
——可以创建消费者集群

basicQos参数

  • 控制推送消息的大小,提前预处理机制,有助于提高数据处理效率。
  • 在并发低于 1w 情况下,一般使用默认值即可,也可根据业务情况调整。若设置的太小,操作太频繁,不好。若设置的太大,某个消费者堆积处理太多消息,其他消费者分不到消息处理。
// 同一时刻,服务器会发送10条消息给消费者
channel.basicQos(10);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容