RabbitMQ简介以及应用

一、简要介绍

  • 开源AMQP实现,Erlang语言编写,支持多种客户端
  • 分布式、高可用、持久化、可靠、安全
  • 支持多种协议:AMQP、STOMP、MQTT、HTTP
  • 适用于多系统之间的业务解耦的消息中间件

二、基本概念

1、exchange:交换器,负责接收消息,转发消息至绑定的队列,有四种类型:

  • direct:完全匹配的路由
  • topic:模式匹配的路由
  • fanout:广播模式
  • headers:键值对匹配路由

Exchange属性:

  • 持久化:如果启用,那么rabbit服务重启之后仍然存在
  • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身

2、Queue:队列,rabbitmq的内部对象,用于存储消息,其属性类似于Exchange,同样可以设置是否持久化、自动删除等。
消费者重Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

3、Binding:绑定,根据路由规则绑定交换器与队列

4、Routing:路由键,路由的关键字

三、消息的可靠性

  • Message acknowledgment:消息确认,在消息确认机制下,收到回执才会删除消息,未收到回执而断开了连接,消息会转发给其他消费者,如果忘记回执,会导致消息堆积,消费者重启后会重复消费这些消息并重复执行业务逻辑。

  • Message durability:消息持久化,设置消息持久化可以避免绝大部分消息丢失,比如rabbitmq服务重启,但是采用非持久化可以提升队列的处理效率。如果要确保消息的持久化,那么消息对应的Exchange和Queue同样要设置为持久化。

  • Prefetch count,每次发送给消费者消息的数量,默认为1

另外,如果需要可靠性业务,需要设置持久化和ack机制,如果系统高吞吐,可以设置为非持久化、noack、自动删除机制

四、简单应用

模拟这样一个业务场景,用户下单成功后,需要给用户增加积分,同时还需要给用户发送下单成功的消息,这是在电商业务中很常见的一个业务场景。

如果系统是微服务架构,可能用户下单功能在订单服务,给用户增加积分的功能在积分服务,给用户发送通知消息的功能在通知服务,各个服务之间解耦,互不影响。那么要实现上述的业务场景,消息中间件rabbitmq是一个很好的选择。
原因如下:

  • 高性能,它的实现语言是天生具备高并发高可用的erlang 语言
  • 支持消息的持久化,即使服务器挂了,也不会丢失消息
  • 消息应答(ack)机制,消费者消费完消息后发送一个消息应答,rabbitmq才会删除消息,确保消息的可靠性
  • 支持高可用集群
  • 灵活的路由

实现思路:

用户下单成功后,rabbitmq发送一条消息至EXCHANGE.ORDER_CREATE交换器,该交换器绑定了两个队列,QUEUE.ORDER_INCREASESCORE、QUEUE.ORDER_NOTIFY,消费者订阅这两个队列分别用来处理增加积分、发送用户通知。如果后续日志系统还需要记录下单的相关日志,那么我们只需要再定义一个队列并将其绑定到EXCHANGE.ORDER_CREATE即可。

下单发rabbitmq消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会跳舞的机器人
 * @date: 2017/10/13 10:46
 * @description: 模拟用户下单之后发送rabbitmq消息
 */
public class OrderCreator {
    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 消息内容
    private static String msg = "create order success";

    /**
     * 模拟创建订单后发送mq消息
     */
    public void createOrder() {
        System.out.println("下单成功,开始发送rabbitmq消息");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 持久化
            boolean durable = true;
            // topic类型
            String type = "topic";
            // 声明交换器,如果交换器不存在则创建之
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // deliveryMode>=2表示设置消息持久化
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // 发布消息
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

积分系统订阅消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会跳舞的机器人
 * @date: 2017/10/13 16:02
 * @description: rabbitmq消费者,模拟下单成功后给用户增加积分
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 增加积分队列名称
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";


    public void consume() {
        // 初始化rabbitmq连接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 声明队列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交换器与队列绑定并设置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《积分系统》收到订单消息:" + msg + ",给用户增加积分......");
        // 手动确认消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息
         * channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息重新放回队列
         * 一般系统会设定一个重试次数,如果超过重试次数,则会丢弃消息,反之则会把消息再放入队列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}


通知系统订阅消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会跳舞的机器人
 * @date: 2017/10/13 16:20
 * @description: rabbitmq消费者,模拟下单成功后给用户发送通知
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 通知用户下单成功通知队列名称
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";


    public void consume() {
        // 初始化rabbitmq连接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 声明队列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交换器与队列绑定并设置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《通知系统》收到订单消息:" + msg + ",开始给用户发送通知......");
        // 手动确认消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息
         * channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息重新放回队列
         * 一般系统会设定一个重试次数,如果超过重试次数,则会丢弃消息,反之则会把消息再放入队列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

测试

package com.robot.rabbitmq;

/**
 * @author: 会跳舞的机器人
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}


输出:

下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......
下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......
下单成功,开始发送rabbitmq消息
《积分系统》收到订单消息:create order success,给用户增加积分......
《通知系统》收到订单消息:create order success,开始给用户发送通知......


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,222评论 0 11
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1