RabbitMQ--初步了解

RabbitMQ是什么?

rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输,即接收和发送消息。

RabbitMQ工作原理


这个系统架构图版权属于sunjun041640。
下面来介绍RabbitMQ里的一些基本定义,主要如下:
RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
Exchange:一边从发布者方接收消息,一边把消息推送到队列。
producer只能将消息发送给exchange。而exchange负责将消息发送到queues。Procuder Publish的Message进入了exchange,exchange会根据routingKey处理接收到的消息,判断消息是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。
queue也是通过这个routing keys来做的绑定。交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
Queue:消息队列。接收来自exchange的消息,然后再由consumer取出。exchange和queue可以一对一,也可以一对多,它们的关系通过routingKey来绑定。
Producer:Client A & B,生产者,消息的来源,消息必须发送给exchange。而不是直接给queue
Consumer:Client 1,2,3消费者,直接从queue中获取消息进行消费,而不是从exchange中获取消息进行消费。
还有一些在上图中没有显示,但在应用程序中会用到的定义。
Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channels: **虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
Bindings:绑定(binding)是指交换机(exchange)和队列(queue)进行关联。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。

Hello,World

下面将从最简单的应用开始简单了解一下RabbitMQ的使用。
生产者

import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //声明一个TCP连接
        Connection connection = factory.newConnection();
        //声明一个channel
        Channel channel = connection.createChannel();
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        while(flag){
            System.out.print("往交换机中推送消息:");
            String message = input.readLine();
            if (message == "quit"){
                flag = false;
                continue;
            }
            //往exchange中推送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
        channel.close();
        connection.close();
    }

}

消费者

import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

首先列举一下几个重要声明的参数

  1. 队列声明queueDeclare
queueDeclare(String queue, boolean durable, boolean exclusive, boolean
 autoDelete, Map<String, Object> arguments)
  1. 生产者发送消息basicPublish
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  1. 消费者消费basicConsume
    消费者到队列中获取消息进行消费。autoAck=true表示消费者收到这个消息就会向队列发送确认ack,表示自己收到了,队列接到反馈后,就会把这条消息从队列中删除,callback表示消费者收到消息后,对消息进行的处理。
basicConsume(String queue, boolean autoAck, Consumer callback)

上述两段代码,有一下几点要说明。
1、从Send.java 代码中可以看出并没有声明架构图中的exchange,而是方法basicPublish中使用了空字符串 (""),这代码使用了默认/匿名交换机。
2.默认交换机有一个特殊的地方,那就是生产者和消费者会公用一个队列。 在Send.java的basicPublish方法中,routingKey参数的位置我们给了一个队列名,默认交换机会把推给他的消息直接再推到相应名称的队列中。在消费者中,声明了队列后,也不需要和交换机进行绑定。消费者会直接到相应队列获取消息。所以我们的设计看起来如下所示(注意只是看起来):

运行生产者和消费者,运行结果如下:


轮询分发机制

在上面的运行结果中,只有一个消费者在消费队列中的消息,如果有多个消费者同时从这个队列中获取消息进行处理,那么结果会是怎样呢?
同时运行两个消费者,运行结果如下:
生产者



第一个消费者



第二个消费者

从运行结果中可以看出,RabbitMQ 会顺序循环的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。

消息确认

这里就会有一个问题,在上述消费者代码中,我们设置autoAck=true,即消费者收到这条消息就通知队列将其删除。然后才会去处理这条消息。
如果consumer处理消息的时候突然挂了,消息还没有被完成,那么这条消息就会消失。为了演示信息丢失,将消费者代码改成如下

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by sunyan on 17/8/1.
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages.");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    Thread.sleep(10000);
                    System.out.println(" [x] Received '" + message + "'");
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

在消息输出前,sleep5秒,下面,运行两个消费者,并在发送第一条消息后,将第一个消费者关闭,


从运行结果可以看出,一共丢失了两条消息。为了不让消息丢失,RabbitMQ
提供了消息确认机制,consumer在接收到,执行完消息后会发送一个ack给RabbitMQ告诉它可以从queue中移除消息了。如果没收到ack。Rabbitmq会重新发送此条消息,如果有其他的consumer在线,将会接收并消费这条消息。消息确认机制是默认打开的。如果想关闭只需要按如下设置 即可。

channel.basicConsume(QUEUE_NAME, false, consumer);


从上图可以看出,第一个消费者退出后,原本轮询分发给他的数据又给了第二个消费者,并没有信息丢失。
如果处理完成后忘记发送ack,那么即使已经被处理的信息,RabbitMQ不会将它删除,且在消费者退出后,将消息继续发送给其他消费者进行重复处理。
把上段代码中的相应内容注释掉。

channel.basicAck(envelope.getDeliveryTag(), false);


从上面三张图可以看出,已经处理过的消息h1,h2又被处理了一遍。忘记basicAck是一个常见的错误,但后果是严重的。因为RabbitMQ无法删除任何消息,将会消耗越来越多的内存。

公平派遣

看到这里,你可能已经觉得轮询分发有点问题了,如果一个消费者处理一个问题要10s,另一个处理只需瞬间,那么一个消费者将不断忙碌,另一个消费者几乎不会做任何工作。但RabbitMQ不知道什么,还会平均分配消息。这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。我们可以使用basicQos方法与 prefetchCount = 1设置。这告诉RabbitMQ在消费者返回一个处理消息的ack之前不要再给他发送新消息。相反,它将发送到下一个还不忙的消费者。
一个消费者设置

 Thread.sleep(10000);

另一个消费者不设置,让其立即处理消息。运行结果如下


从图中可以看出,原本应该有Recv处理的消息“hello3”,由Recv1处理了。

消息持久化

消息确认中讲了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,消息仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,需要两件事来确保消息不会丢失:我们需要将队列和消息标记为耐用。
首先,我们需要确保RabbitMQ不会失去我们的队列。为了这样做,我们需要将其声明为持久的:

channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。这是因为我们已经定义了一个 不耐用的名为hello的队列。RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 用不同的名称声明一个队列。

channel.queueDeclare("hello_world",durable,false,false,null);

消息持久化
需要通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN来标记我们的消息。
总结一下,为了数据不丢失,我们采用了:

  1. 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
  2. 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
  3. 持久化Message,理由同上。

总结

在本篇中,主要介绍了一下默认交换机,在默认交换机下,一条消息只能分发给一个消费者。那如果有很多不同的消费者都对这条消息的话,默认交换机就无法实现了?那么具体该怎么实现,将在RabbitMQ--交换机中进行说明。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,344评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • RabbitMQ 原理介绍及安装部署 标签:RabbitMQ 安装 简介 RabbitMQ 是一个用 Erlang...
    神仙CGod阅读 8,551评论 0 60
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • RabbitMQ笔记 本文参考资料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda阅读 2,812评论 0 11