发布/订阅模式
在之前的教程中,我们创建了一个工作队列。使用工作队列可以使每条消息都能准确的传递给一个消费者。在这一部分,我们要做一些完全不同的事情——我们会把消息传递给多个消费者,这就是发布订阅模式。
Exchanges
在之前的部分,我们通过队列来进行收发消息,现在我们要介绍rabbit的完整消息模型。
快速回顾一下上节教程的内容:
一个生产者发送消息
一个队列缓存消息
一个消费者接收消息
现在的这个消息模型的核心思想就是生产者不会直接向队列发送消息,事实上,生产者甚至不会知道消息是否被传递给了队列。
生产者只能把消息发送给一个exchange。exchange是一个非常简单的东西。一方面,他接收从生产者过来的消息,另一方面,他把这些消息推送到队列里。exchange必须非常明确接收到的消息要怎么处理,是否应该把消息推送给某个特别的队列,还是推送给很多队列,或者这个消息是否应该被丢弃,这些规则在exchange type里定义了。
exchange 的类型有这几种:direct, topic, headers, fanout
我们重点讲一下最后一种,广播模式。我们创建一个叫logs的exchange。
这个广播的exchange非常简单,像你猜的那样,他只是把他所接收到的消息广播给所有他知道的队列。
匿名的exchange
之前我们对exchange一无所知,但是我们仍然可以把消息发送给队列,这是因为我们使用了默认的exchange,通过一个空支付穿的形式定义的参数。就是下面这张图的第一个参数。
现在我们可以用我们刚创建的叫logs的exchange来代替这个匿名的。
临时队列
你应该记得之前我们使用的队列都有一个特别的名字。给队列定义一个名字对我们来说十分重要,我们需要给消费者们指明一样的队列。包括生产者消费者之间的互相工作。
但是现在我们的程序好像不是这样。我们想要接收所有的日志消息,不是这些消息的一个子集。我们也只对当前的消息流感兴趣。
有两点要做的,第一点,当我们连接上rabbit之后,我们需要刷新,清空队列,用一个随机的名字或者rabbit服务器生成的随机名字来创建一个队列。第二,当我们把某个消费者断开连接之后,我们需要自动删除相应的队列。
String queueName = channel.queueDeclare().getQueue();
这样就创建了一个不持久、连接独有的、自动删除的队列。
随机名字就像amq.gen-JzTY20BRgKO-HjmUJj0wLg.
绑定
我们已经创建了一个exchange了,接下来我们要告诉这个exchange去把消息发送给我们的队列。exchange和队列之间的关系就叫做绑定。
总结
生产者和之前的程序没什么不同,最重要的变化是我们现在发送消息是发送到我们叫logs的exchange上而不是之前的匿名exchange。我们需要提供一个routingKey(路由key)。
如果这个exchange没有绑定任何队列的话,消息都会丢失。不过我们现在还没有消费者在监听,所以我们可以安全的丢弃消息。
编译运行略
翻译自