本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
路由
(使用php-amqplib)
在上一个教程我们建立了一个简单的日志系统。我们已经能够把日志消息广播到许多接收者。
在这个教程中,我们准备为这个消息系统添加一个特性——让它能够仅订阅消息中的一个子集。例如,我们将能够把关键的错误信息记录到日志文件(保存到磁盘空间中),同时能够把所有的日志消息打印到控制台上。
绑定
上一个例子,我们已经(在路由器与Queue(队列)之间)建立了绑定。你可能还记得下面的代码:
$channel->queue_bind($queue_name,'logs');// 将队列绑定到logs路由上
一个绑定就是一个交换机(exchange)与一个Queue(队列)之间的关系。这可以简单地描述为:这个Queue(队列)对来自这个路由的消息感兴趣。
绑定可以携带一个额外的routing_key
参数。为了避免与$channel::basic_publish
的参数混淆(第三个也叫routing_key
),我们准备称他为bingding_key
。这是我们创建一个带有键的绑定的方式:
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
这意味着一个绑定键(binding key)取决于路由的类型。之前我们所使用的fanout
类型的路由,简单地忽略了这个值。
直接交换(Direct exchange)
我们在上一个教程中的日志系统广播所有消息到所有的Consumer(消费者)。我们打算拓展它,让它可以根据消息的严重程度过滤消息。例如我们可能希望写日志的脚本仅把收到的严重错误的日志消息写到磁盘上,从而不把磁盘空间浪费在过多的警告或者信息类型的日志消息。
我们所使用的fanout
交换机并不能提供给我们足够的灵活性——它只是盲目地广播。
我们将会使用一个direct
类型的交换机来代替。direct
类型交换机背后的路由算法(routing algorithm)是简单的——当Queue(队列)的binding key
与消息的routing key
完全匹配的时候,这条消息才会进入对应的路由。
为了说明这种情况,参考一下下面的步骤:
在这个步骤中,我们可以看到direct
类型的交换机X
和绑定在它上面的两个Queue(队列)。第一个队列通过orange
绑定键(binding key)来绑定,而第二个则拥有两个绑定,一个是通过black
绑定键,另外一个是通过green
绑定键。
在这样一个步骤中,一条带有orange
路由键(routing key)的消息发送到交换机会被路由到Queue(队列)Q1
。带有black
或者green
路由键的消息将会被路由到Q2
。所有其他的消息将会被丢弃。
多重绑定
用相同的键绑定多个Queue(队列)是完全合法的。在我们的例子中我们可以在X
与Q1
之间通过black
绑定键添加一个绑定。在这一情况,direct
路由将会像fanout
路由那样广播消息到所有匹配的Queue(队列)上。一条拥有black
路由键的消息将会被同时分发到Q1
和Q2
。
发出日志
我们把这一模型使用在日志系统上。我们将发送消息到一个direct
路由来代替fanout
路由。我们将会把日志等级作为一个routing key
。这样的话接收脚本将能够选择它想接收的严重等级。让我们先学习发出日志。
像往常一样,我们首先需要创建一个交换机。
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
然后我们已经准备好发送一条消息了。
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
为了简化,我们将会假定“severity
”可以是“info
”,“warning
”,“error
”中的一个。
订阅
接收消息与前面的教程类似,只不过我们准备为我们感兴趣的每一个日志等级(severity)创建一个新的绑定。
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
把他们一起运行
emit_log_direct.php
类的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 定义为direct模式路由
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 从命令行接收参数作为日志等级,默认为info
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
// 发送消息到对应的路由,并携带一个routing key
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " [x] Sent ",$severity,':',$data," \n";
$channel->close();
$connection->close();
?>
receive_logs_direct.php
的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 定义为direct交换机并命名为direct_logs
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
// 创建临时的非持久化的队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if(empty($severities )) {
file_put_contents('php://stderr',
"Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach($severities as $severity) {
// 绑定多个binding key
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你打算只保存"warning
"与"error
"(不包含"info
")等级的日志消息到一个文件,只需要打开控制台并输入:
php receive_logs_direct.php warning error > logs_from_rabbit.log
如果你想在你的屏幕上查看所有的日志消息,只需要打开一个新的终端并输入:
php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
以及,例如,你想发送一个error
等级的日志消息,只需要输入:
php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(完整的(emit_log_direct.php源码)与(receive_logs_direct.php源码)在此)