RabbitMQ的使用

1. Helloworld-基本消息模型

  1. 导入的jar包
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <!--和springboot2.0.5对应-->
        <version>5.4.1</version>
    </dependency>

  1. 连接工具类
public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

1582541127455.png

发送方

流程:

①:获得连接对象

②:创建与交换机的连接通道

③:创建消息队列

④:发送消息

public class Send {
    public static final String QUEUE_NAME_HELLO = "queue_name_hello";
    public static void main(String[] args) {
        Connection connection = null;
        try {
            //1. 创建连接
            connection = ConnectionUtil.getConnection();
            //2. 创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            Channel channel = connection.createChannel();
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             */
            channel.queueDeclare(QUEUE_NAME_HELLO,true,false,false,null);
            String message = "hello rabbitmq";
            /**
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             * param3:消息包含的属性
             * param4:消息体
             */
            /**
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
             示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("", QUEUE_NAME_HELLO,
                    null,message.getBytes());
            System.out.println("message send successful!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者:

流程:

①:获得连接对象

②:创建与交换机的连接通道

③:声明监听队列

④:从队列中获取消息,接收消息的回调

public class Consumer01 {
    public static void main(String[] args) {
        Connection connection = null;
        try {
            //1. 获得连接
            connection = ConnectionUtil.getConnection();
            //2. 创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            Channel channel = connection.createChannel();
            //3. 声明队列

            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */

            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者标识:"+consumerTag);
                    System.out.println("交换机名称:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                }
            };
            /**
             * 监听队列String queue, boolean autoAck,Consumer callback
             * 参数明细
             * 1、队列名称
             * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
             为false则需要手动回复
             * 3、消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

启动Send发送消息,可以看到当前的队列中有一条消息处于就绪状态!

1582540913388.png

启动Consumer01接收消息,消息被成功接收!

消费者标识:amq.ctag-ALoQPcEfIDlldhAZqAU4CQ
交换机名称:
路由key:queue_name_hello
消息id:1
接收消息:hello rabbitmq
消息接收成功!
1582540844236.png

2. 消息确认机制(ACK)

在上述代码中,在basicConsume()里的autoAck设置的值为true,他表示自动签收,即当消费者接收到了消息之后,队列中的消息就会立即删除,如果为false就表示为手动签收!

RabbitMQ有一个ACK机制:即当消费者接收到了消息后,会向 RabbitMQ发送一个回执ACK,告知消息已经被接收!RabbitMQ会将队列中的消息删除掉!ACK机制分为两种

自动签收:消息一旦被接收,自动发送回执ACK

手动签收:消息被接收后,需要手动发送ACK

自动签收存在的问题:

模拟一个异常环境,当消费者接收到消息,进入回调函数中,人为抛出一个异常,查看消息接收状态!

            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    int i = 1/0; //模拟异常环境
                    System.out.println("消费者标识:"+consumerTag);
                    System.out.println("交换机名称:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                }
            };
        
            channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
    }

}

此时再次发送消息:


1582543055232.png

[图片上传失败...(image-b23445-1585216601052)]

再次接收:控制台没有打印任何信息,消息是没有被成功接收到的

1582543124202.png

[图片上传失败...(image-e39c33-1585216601052)]

但是队列中的消息已经被删除掉了,说明,在自动签收的情况下,如果遇到异常,数据会丢失!因此如果数据很重要建议使用手动模式!

Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //int i = 1/0;
                    System.out.println("消费者标识:"+consumerTag);
                    System.out.println("交换机名称:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动提交ACK
                }
            };
channel.basicConsume(Send.QUEUE_NAME_HELLO,false , consumer);

3. Work queues模型

1582543583679.png

[图片上传失败...(image-56d745-1585216601052)]

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息!

结果:

  • 同一个消息只能给一个一个消费者

  • RabbitMQ采用轮询的方式将消息发送给消费者

  • 消费者只有在处理完一条消息后,才能接收下一条消息

能者多劳

模拟一个消费者处理消息效率低下的情况:

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者标识:"+consumerTag);
                    System.out.println("交换机名称:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动提交ACK
                }
            };
            channel.basicConsume(Send.QUEUE_NAME_WORK_QUEUE,false , consumer);

此时RabbitMQ采用轮询的方式再发送消息,处于休眠的消费者和另一个消费者会轮流接收到消息,但是处于休眠的消费者处理耗时较长,而另一个消费者处理效率更高,会一直处于空闲状态!

此时可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者标识:"+consumerTag);
                    System.out.println("交换机名称:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicQos(1); //设置最大处理消息数为1
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动提交ACK
                }
            };

这样设置后,处于休眠的消费者一直在处理分配给它的一条消息,而另一个消费者会接收到消息,处理完后又会接收到下一条消息,不再处于空闲状态,从而实现能者多劳!

4. 订阅模型分类

1582547449987.png

[图片上传失败...(image-b0a2d-1585216601052)]

特点:

①:一个生产者,多个消费者

②:每个消费者侦听自己的队列

③:每个队列绑定在交换机上

④:生产者负责把消息发送给交换机

⑤:生产者把消息发送到交换机,通过队列到达消费者,实现一条消息,被多个消费者消费!

⑤:根据交换机的不同类型处理消息

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

4.1 Exchange类型-Fanout:广播

在广播的工作模式下:

①:一个生产者和多个消费者

②:生产者与交换机绑定,多个消费者有自己的队列,队列与交换机绑定!

③:生产者负责把消息发送给交换机,交换机负责把消息发送给具体的队列,生产者无法决定!

④:交换机把消息发送给每一个队列

⑤:每一个消费者都能接收到消息!实现一条消息被多个消费者消费

流程:

声明Exchange,不再声明Queue

发送消息到Exchange,不再发送到Queue

生产者:

public class Send {
    public static final String EXCHANGE_NAME_FANOUT= "exchange_name_fanout";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String message = "hello rabbitmq";
        //声明交换机名字和类型
        channel.exchangeDeclare(EXCHANGE_NAME_FANOUT, BuiltinExchangeType.FANOUT);
        //发布消息 routingKey为"",发送到每一个与之绑定的队列!
        channel.basicPublish(EXCHANGE_NAME_FANOUT, "", null, message.getBytes());
        System.out.println("send message successful!");
    }
}

消费者01:

public class Consumer01 {
    public static final String QUEUE_NAME_01 = "quene_name_01";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_01, false, false, false, null);
        //绑定到交换机
        channel.queueBind(QUEUE_NAME_01, Send.EXCHANGE_NAME_FANOUT, "");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者标识:"+consumerTag);
                System.out.println("交换机名称:"+envelope.getExchange());
                System.out.println("路由key:"+envelope.getRoutingKey());
                System.out.println("消息id:"+envelope.getDeliveryTag());
                System.out.println("接收消息:"+new String(body));
                System.out.println("消息接收成功!");
                channel.basicQos(1); //设置最大处理消息数为1
                channel.basicAck(envelope.getDeliveryTag(), false);//手动提交ACK
            }
        };
        channel.basicConsume(QUEUE_NAME_01,false,consumer );
    }
}

消费者02:

public class Consumer02 {
    public static final String QUEUE_NAME_02 = "quene_name_02";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_02, false, false, false, null);
        //绑定到交换机
        channel.queueBind(QUEUE_NAME_02, Send.EXCHANGE_NAME_FANOUT, "");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者标识:"+consumerTag);
                System.out.println("交换机名称:"+envelope.getExchange());
                System.out.println("路由key:"+envelope.getRoutingKey());
                System.out.println("消息id:"+envelope.getDeliveryTag());
                System.out.println("接收消息:"+new String(body));
                System.out.println("消息接收成功!");
                channel.basicQos(1); //设置最大处理消息数为1
                channel.basicAck(envelope.getDeliveryTag(), false);//手动提交ACK
            }
        };
        channel.basicConsume(QUEUE_NAME_02,false,consumer );
    }
}

生产者发送消息到交换机,交换机根据routingKey匹配队列,这里没有指定routingKey,交换机会将消息发送到每一个队列,消费者01和消费者02都能收到消息!

使用场景:

群发信息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容