RabbitMQ--交换机

在上一篇文章[[RabbitMQ--初步了解]中,介绍了默认交换机,每个消息只分发给一个消费者。在本篇中,我们将把一个消息分发给多个消费者。这种模式被称为“发布/订阅”。

交换机类型

交换机类型:匿名/默认交换器,头交换机,扇型交换机(fanout),直连交换机(direct), 主题交换机(topic)。

扇型交换机 fanout

扇形交换机把消息发送给它所知道的所有队列。

import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class SendFanout {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
//        声明TCP连接
        Connection connection = factory.newConnection();
//        声明channel
        Channel channel = connection.createChannel();
//       声明扇形交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        while(flag){
            System.out.print("往交换机中推送消息:");
            String message = input.readLine();
            if (message == "bye"){
                flag = false;
                continue;
            }
//            把消息发送给family交换机,在发送的时候我们不需要提供 routing_key参数,但是fanout默认将消息发给所有队列,即使提供routing_key参数,也会被忽略
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

把消息发送给fanout交换机时,不需要制定routing_key参数,因为fanout默认将消息发给所有队列,即使提供routing_key参数,也会被fanout忽略。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Date;
public class RecvFanout {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
//        声明TCP
        Connection connection = factory.newConnection();
//        声明channel
        Channel channel = connection.createChannel();
//      声明扇形交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//        声明一个队列
        String queueName = channel.queueDeclare().getQueue();
//       绑定(binding 交换器和队列之间的联系。
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" Consume Waiting for messages for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Consume Received '" + message + "'");
            }
        };
//        消费者消费消息
        channel.basicConsume(queueName, true, consumer);
    }
}

上述两段代码,有一下几点要说明。


1、交换机的声明方式

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) 

2、队列声明方式

1. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
 2. queueDeclare()

在这里使用了第二种方式,即让服务器为我们选择一个随机的队列名。
3、不论是生产者的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

还是消费者的queueBind

Queue.BindOk queueBind(String queue, String exchange, String routingKey)

均有String exchange, String routingKey这两个参数
生产者的exchange表示他会将消息推到这个exchange中,queue的exchange表示它会从这个exchange中取消息供消费者消费。生产者的routingKey我认为可以表述为消息的一个标签,生产者推到 exchange每个消息都可以认为有两部分组成,一个是消息内容本身,一个是标签,这个标签可以认为是routingKey,他描述了消息是一条怎样的消息。例如,下面两条消息,routingKey是hello,则可以认为发送的两条消息内容都是用来打招呼的。

channel.basicPublish(EXCHANGE_NAME, "hello", null, “你好,小明”.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "hello", null, “你好,小红”.getBytes("UTF-8"));

queue的routingKey表示这个消息队列愿意接收什么样的消息,他会判断exchange里消息的routingKey跟自己的routingKey是否匹配或者相等,依此来决定是否接收这个消息。
在这两段代码中,并没有提供routingKey参数,是因为fanout默认将消息发给所有队列,即使提供routingKey参数,也会被忽略。
4、消费者会到队列中取消息进行消费,autoAck=true表示消费者收到这个消息就会向队列发送确认ack,表示自己收到了,队列接到反馈后,就会将这条消息从队列中删除,callback表示消费者收到消息后,对消息进行的处理。

String basicConsume(String queue, boolean autoAck, Consumer callback)

在启动两个消费者(队列名随机,说明有两个队列),再启动生产者。
其中为了更好的演示消息发送与接收,我将消息内容改为从命令行接收。代码如下。

import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class SendFanout {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
//        声明TCP连接
        Connection connection = factory.newConnection();
//        声明channel
        Channel channel = connection.createChannel();
//       声明扇形交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        while(flag){
            System.out.print("往交换机中推送消息:");
            String message = input.readLine();
            if (message == "bye"){
                flag = false;
                continue;
            }
//            把消息发送给family交换机,在发送的时候我们不需要提供 routing_key参数,但是fanout默认将消息发给所有队列,即使提供routing_key参数,也会被忽略
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

生产者发送消息,消费者接收消息,效果如下图:


直连交换机(direct)

扇型交换机,能做的仅仅是广播。若不想队列盲目接收所有消息,可以使用直连交换机(direct exchange)来代替。直连交换机将会对exchange中消息的routingKey和和queue的routingKey进行精确匹配,从而确定消息该分发到哪个队列。此处的精确匹配是指相等。
下图能够很好的描述这个场景:


package Direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Created by sunyan on 17/8/3.
 */
public class SendDirect {
    private static final String EXCHANGE_NAME = "color";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        String routingKey = null;
        String message = null;
        while (flag) {
            System.out.print("往交换机中推送消息:");
            String str = input.readLine();
            if (str == "bye") {
                flag = false;
                continue;
            }
            String[] m = str.split(",");
            routingKey = m[0];
            message = m[1];
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

注意,routingkey和message,后面都会通过键盘输入给出。

package Direct;
import com.rabbitmq.client.*;
import java.io.IOException;

public class RecvDirect1 {
    private static final String EXCHANGE_NAME = "color";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "black");
        channel.queueBind(queueName, EXCHANGE_NAME, "green");


        System.out.println("info/debug Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException{
                String message = new String(body, "UTF-8");
                System.out.println("black/green Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

第一个消费者队列给定了两个routingKey,black和green。

package Direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by sunyan on 17/8/3.
 */
public class RecvDirect2 {
    private static final String EXCHANGE_NAME = "color";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("orange logs Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException{
                String message = new String(body, "UTF-8");
                System.out.println("error logs Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

第一个消费者队列给定了一个routingKey,orange。
运行消费者和生产者,结果如下。


从运行结果可以看出,routingKey为green和black的消息被第一个消费者获取,routingKey为orange的message被第二个消费者获取,我还发送了一条routingKey为red的message,但因为没有与之匹配的queue的routingKey,所有这条消息并没有被这两个消费者中任何一个收到。
主题交换机(topic)
尽管直连交换机已经可以让队列有选择的获取信息,但这依然不够,它没办法基于多个标准执行路由操作。所以需要主题交换机。
发送到主题交换机(topic exchange)消息的路由键(routing_key),其形式不是任意的,必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。词语的个数可以随意,但是不要超过 255 字节。
queue和exchange绑定的routingKey也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 ,一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。
routingKey中有两个特殊符号,
*(星号) 用来表示一个单词.
#(井号) 用来表示任意数量(零个或多个)单词。
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

package topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Created by sunyan on 17/8/3.
 */
public class SendTopic {
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);


            BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
            boolean flag = true;
            String routingKey = null;
            String message = null;
            while (flag) {
                System.out.print("往交换机中推送消息:");
                String str = input.readLine();
                if (str == "bye") {
                    flag = false;
                    continue;
                }
                String[] m = str.split(",");
                routingKey = m[0];
                message = m[1];
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + routingKey+ "':'" + message + "'");

            }
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        }
        catch  (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                }
                catch (Exception ignore) {}
            }
        }
    }
}
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by sunyan on 17/8/3.
 */
public class ReceiveTopic1 {
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        String routingKey = "*.orange.*";

        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);


        System.out.println(" [*.orange.*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [*.orange.*] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveTopic2 {
    private static final String EXCHANGE_NAME = "topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        String routingKey1= "*.*.rabbit";
        String routingKey2= "lazy.#";

        channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);


        System.out.println(" [*.*.rabbit AND lazy.#] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [*.*.rabbit AND lazy.#] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

上述场景,将用下图说明:


这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: ..。
我们创建了三个绑定:Q1 的绑定键为 .orange.,Q2 的绑定键为* .*.rabbit 和 lazy.# 。
这三个绑定键被可以总结为:
Q1 对所有的桔黄色动物都感兴趣。
Q2 则是对所有的兔子和所有懒惰的动物感兴趣。

运行消费者和生产者,结果如图所示。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。
发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容