RabbitMQ快速上手

一、RibbitMQ的基础介绍

1. 为什么要使用MQ
2. 与其他MQ的区别

  • ActiveMQ:使用Java开发,遵循JMS规范,使用方便,支持多种协议。但是有丢失消息的风险并且速度较慢
  • RabbitMQ:使用Erlang开发(用于解决高并发的问题),可以解决并发问题。但是只支持AMQP协议且不能动态扩展

二、RabbitMQ的安装

1. 安装Erlang环境(这一步参照博客 <a>https://www.jianshu.com/p/27197d58e94c</a>)

  • 安装阿里的yum源(我在安装的时候下载速度很慢,所以这边使用阿里的yum源来安装erlang)

    wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
    yum-y install make gcc gcc-c++kernel-devel m4 ncurses-devel openssl-devel java-devel  unixODBC-devel
    
  • 安装erlang的yum源

    wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
    rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
    
  • 然后就可以直接安装erlang了

    yum -y install erlang
    
  • 通过下面的命令就可以查看是否安装完成了

    erl
    

2. 安装RibbitMQ

  • 下载Rabbit的yum源(下载下来的时候名字很乱,改个名字)

    wget https://bintray.com/rabbitmq/rpm/download_file?file_path=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm
    mv download_file\?file_path\=rabbitmq-server%2Fv3.7.x%2Fel%2F7%2Fnoarch%2Frabbitmq-server-3.7.14-1.el7.noarch.rpm rabbitmq-server-3.7.14-el7.noarch.rpm
    
  • 安装

    yum -y install rabbitmq-server-3.7.14-el7.noarch.rpm
    
    • 如果报有依赖需要解决,就直接使用yum下载这个依赖就好了
  • 启动服务

    rabbitmq-server start
    
  • 后台启动

    rabbitmq-server -datached
    
  • 启动后使用amqp协议,默认在5672端口

三、RabbitMQ初步使用

1. 搭建管理平台

  • 初步搭建没有任何插件,我们使用下面的命令下载并启用RabbitMQ的管理地址

    rabbitmq-plugins enable rabbitmq_management
    
  • 现在就可以访问该节点的15672端口使用guest/guest来登陆管理界面

    • 如果不是在localhost下访问,我们还需要修改/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/ebin/rabbit.app文件,将{loopback_users, [<<”guest”>>]}
      改为{loopback_users, []}

      {default_user, <<"guest">>},
      {default_pass, <<"guest">>},
      {default_user_tags, [administrator]},
      {default_vhost, <<"/">>},
      {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
      {loopback_users, []},
      {password_hashing_module, rabbit_password_hashing_sha256},
      {server_properties, []},
      

      再重启就OK了

    • Virtural Host 用于区分不同业务,每个VH都是独立的,互不影响的。不同的团队用不同的VH,相互隔离

2. 点对点简单队列

点对点简单队列:一个生产者投递消息给队列,只允许一个消费者进行消费,(如果存在消费者集群,则会均摊消费,使用取模算法)每个消息只会消费一次

生产者生产的消息直接投递给队列服务器,然后队列服务器直接推送或消费者自行拉取消息

  • ACK应答模式

    • 自动应答:当消费者收到消息后,不论是否处理,消费者都会自动应答消费。
    • 手动应答:消费者在代码里显式的回复ACK
  • 导入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>
    
  • 封装一个链接工具

    public class MQConnectionUtils {
        /**
         * 创建新的链接
         * @return
         */
        public static Connection connect() throws IOException, TimeoutException {
            //创建链接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            //设置链接参数
            connectionFactory.setHost("192.168.3.203");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            return connectionFactory.newConnection();
        }
    }
    
  • 生产者

    public class Producer {
        /**队列名称*/
        private static final String QUEUE_NAME = "libi_QUEUE";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //建立链接
            Connection connection = MQConnectionUtils.connect();
            //创建通道
            Channel channel = connection.createChannel();
            //创建一个队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //创建消息
            String message = "Libi_Message";
            //发送消息
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //关闭通道和链接
            channel.close();
            connection.close();
            System.out.println("消息投递成功!");
        }
    }
    
  • 消费者

    public class Consumer {
        /**队列名称*/
        private static final String QUEUE_NAME = "libi_QUEUE";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //建立链接
            Connection connection = MQConnectionUtils.connect();
            //创建通道
            Channel channel = connection.createChannel();
            //消费者关联一个队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                //使用匿名内部类重写获取消息的方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body, "UTF-8");
                    System.out.println("活动生产者消息:"+msg);
                }
            };
            //设置应答模式,true表示自动应答,false表示手动应答
            channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
            //关闭通道和链接
            channel.close();
            connection.close();
            System.out.println("消息消费成功!");
        }
    }
    

3. 公平队列(需要自己在代码里实现)

  • 均摊消费的缺点:当消费者处理消息的能力不一致时,如果还是均摊处理信息,则会造成资源浪费(对消费慢点节点不公平),需要实现"能者多劳"

  • 公平队列的实现思路(BaseQos方法):当有n个消费者在上一条消息还没有处理完成时(还没有发送ACK),消息队列就不会发送下一条消息给它,给另外一个消费者。在生产者通过如下代码开启Qos

    channel.basicQos(n);
    
  • 如果这时消费者在代码里忘记应答了,那么就会陷入阻塞

4.发布订阅模式

生产者投递消息给交换机,交换机根据路由策略转发到不同的队列服务器中,队列服务器再给消费者进行消费

  • 交换机策略

    • Direct:直接交换机,一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。

      就是说直接交换机可以更具生产者的routing_key和消费者的binding_key进行匹配,只有一样才会转发这个消息

    • Fanout:扇形交换机,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

    • Topic:主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

      主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:

      • *表示一个单词
      • #表示任意数量(零个或多个)单词。

      假设有一条消息的routing_keyfast.rabbit.white,那么带有这样binding_key的几个队列都会接收这条消息:

  • Handler:首都交换机,首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。

  • 生产者(在发送消息的时候传入exchange的参数)

    /**
     * 使用Fanout类型的交换机,交换器转给发全部的队列
     */
    public class Producer {
        //交换机名称
        static final String EXCHANGE_NAME = "fanout_destination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = MQConnectionUtils.connect();
            Channel channel = connection.createChannel();
    
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String msg = "my_fanout_meg";
            //发送消息(路由策略为空串)
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
  • 消费者

    public class EmailConsumer {
        private static String QUEUE_NAME = "Email_Queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = MQConnectionUtils.connect();
            Channel channel = connection.createChannel();
    
            //消费者声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //消费者绑定交换机
            channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME,"");
            //监听消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("邮件消费者:" + msg);
                }
            };
    
            channel.basicConsume(QUEUE_NAME, defaultConsumer);
        }
    }
    

    消费者的chanel和connection没有关闭,可以多启动几个,就会发现所有的消费者都可以收到生产者传入的信息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容