本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
发布/订阅
(使用php-amqplib)
在上一个教程,我们创建了一个工作队列。Work Queue(工作队列)是在每一个任务都分发给一个确切的处理程序的假设上建立的。在这一部分,我们将会做一些完全不同的事情——我们将会发送一条消息去多个Consumers(消费者)上。这种模式被称为“发布/订阅”。
为了说明这种模式,我们将会建立一个简单的日志系统。它将会包含两个程序——第一个会发出日志消息,而另一个则会接收并打印这些消息。
在我们的日志系统里,每一个正在运行的接收程序副本都会获得(所有)消息。那样我们就可以运行一个接收程序把日志引导到磁盘;同时我们可以运行另外一个接收程序把日志打印到屏幕上来看。
基本上,被发布的消息将会在所有接收程序间进行广播。
交换
在本教程的上一部分,我们通过一个Queue(队列)发送和接收消息。现在是时候介绍一下 RabbitMQ 的全消息模型(Full Messaging Model)了。
让我们快速地复习前面的教程涵盖的内容:
1.一个Producer(生产者)就是一个发送消息的程序。
2.一个Queue(队列)就是一个保存消息的缓冲(buffer)。
3.一个Consumer(消费者)就是一个接收消息的用户程序。
RabbitMQ的消息模型的主要思想就是:Producer(生产者)从不会直接将消息发送到Queue(队列)中。实际上,很多时候Producer(生产者)甚至完全不知道一条消息将会被发送到Queue(队列)。
相反地,Producer(生产者)只能发送消息到一个交换机(exchange)。一个交换机就是一个非常简单的东西。一方面(或者是叫一端更形象?)从Producer(生产者)处接收消息,另一方面(端?)它把这些消息推进Queue(队列)中。交换机必须准确地知道对它接收的每一条消息做什么。应该将这条消息追加到一个特别的Queue(队列)吗?应该将这条消息追加到多个Queue(队列)吗?或者应该把这条消息丢弃掉吗?做这些事情的规则是通过定义交换类型来确定的。
这里只有少数有效的交换类型:direct
,topic
,headers
和fanout
。我们将会集中(介绍)最后一个fanout
。让我们创建一个这种类型的交换机并称他为logs
。
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout
交换机是非常简单的。从它的名称就能猜到(反正我是不知道怎么翻译好-_-),它只是将它获取到的所有消息广播到它所知道的所有Queue(队列)中。这与我们的日志系统需求十分吻合。
列出交换机
你可以运行十分有用的
rabbitmqctl
来列出服务器上的所有交换机:sudo rabbitmqctl list_exchanges
在列出的队列中会由一些
amq.*
交换机以及默认的(未被命名)的交换机。这是默认创建的,但此时你似乎并不需要使用他们。默认的交换机
在教程的上一部分我们对交换机一无所知,却仍然可以发送消息到Queues(队列)中。这很可能是因为我们使用了一个通过空字符串("")识别的默认的交换机。
重新回顾我们之前发送一个消息的时候:
$channel->basic_publish($msg, '', 'hello');
在此处,我们使用了默认的,或者说是无名的交换机:消息会被路由到
routing_key
指定的Queue(队列),如果这个队列存在的化。routing_key
就是basic_publish
的第三个参数。
现在,让我们发送消息到一个被命名的交换机上:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
临时队列
你可能记得之前我们使用的Queue(队列)都是拥有了一个指定的名称(还记得hello
和task_queue
吗?)。在我们需要将工作程序指向对应的Queue(队列)时可以命名一个Queue(队列)对于我们来说是十分重要的。当你想要在Producer(生产者)与Consumer(消费者)之间共享Queue(队列)时候为Queue(队列)命名是十分重要的。
但是这对我们的日志记录器来说这并不重要。我们打算监听所有的日志消息吗,而不是仅仅是其中一部分。同样我们只对当前流动的消息感兴趣,而不是旧的消息。为了解决这一情况,我们需要两样东西。
首先,无论何时,我们连接到RabbitMQ时候都需要一个新的,并且是空的Queue(队列)。为了达到这一目的,我们可以使用随机的名称来创建一个Queue(队列),或者,更好的选择是——让RabbitMQ服务器为我们选择一个随机的Queue(队列)。
其次,一旦我们的Consumer(消费者)断开了链接,对应的Queue(队列)应该被自动删除。
在php-amqplib客户端中,当我们传给Queue(队列)名称参数一个空字符串时候,我们能创建一个非持久化队列(non-durable Queue),并反回了一个(自动)生成的队列名称:
list($queue_name,,) = $channel->queue_declare('');
当这个方法反回,$queue_name
变量包含了一个由RabbitMQ生成的随机名称。例如,它看起来会像是这样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
当这个连接宣布关闭了,对应的Queue(队列)将会被删除,因为它被声明为独有的(exclusive)。
绑定
我们已经创建了一个fanout
交换机以及一个Queue(队列)。现在我们需要告诉交换机去发送消息到我们的Queue(队列)。交换机与队列之间的关系成为绑定(binding)。
$channel->queue_bind($queue_name, 'logs');
从现在起,logs
交换机将会追加消息去我们的队列。
列出绑定
如你所想,通过以下方式你可以把正在使用的绑定(bindings)。
rabbitmqctl list_bindings
将他们放在一起
发出日志的这个Producer(生产者)程序与我们之前教程的不会相差太多。最重要的改变就是我们现在希望发送消息到我们的logs
交换机而不是无名的那个。下面就是emit_log.php
脚本的代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
// 发送消息到交换机上(而不是指定队列)
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
这如你所看到的,在建立连接之后我们声明了一个交换机。这一步十分重要因为发送消息到不存在的交换机是被禁止的,
如果已经没有Queue(队列)绑定到交换机,交换机的消息将会被丢失,但对于我们来说可以接受;如果已经没有Consumer(消费者)监听该交换机的消息了,我们可以安全地删除这些消息。
reveive_logs.php
的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明对应的交换机
$channel->exchange_declare('logs', 'fanout', false, false, false);
// 创建一个非持久化的队列并获取自动生成的队列名称
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 绑定队列到交换机
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你想保存这些日志到文件,只需要打开一个控制台,并输入:
php reveive_logs.php > logs_from_rabbit.log
如果你想在屏幕上看到这些日志,新建一个新的终端并运行:
php reveive_logs.php
当然,你还需要发送日志:
php emit_log.php
使用rabbitmqctl list_bindings
你可以验证这些代码已经如我们所想地创建了绑定与Queue(队列)。如果是运行着两个receive_logs.php
程序,你将会看到类似下面的情况:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
这结果的解析很直接了当:数据从logs
交换机发送到了两个以服务器分配的名称命名的Queue(队列)上。这正式我们所期望的。