源码:https://github.com/ltoddy/rabbitmq-tutorial
Topics
(using the Pika Python client)
本章节教程重点介绍的内容
在之前的教程中,我们改进了日志记录系统。我们没有使用只有虚拟广播的fanout交换,而是使用了direct交换,并让选择性接收日志成为了可能。
尽管使用direct交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个标准进行路由。
在我们的日志系统中,我们可能不仅需要根据严重性来订阅日志,还要根据发布日志的来源进行订阅。您可能从syslog unix工具知道这个概念,
该工具根据严重性(info / warning / crit...)和工具(auth / cron / kern ...)来路由日志。
这会给我们很大的灵活性 - 因为我们可能想听取来自'cron'的error日志,而且还听取来自'kern'的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的topic交换。
Topic 交换
发送到topic交换的消息必须有规范的routing_key - 它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与该消息相关的一些功能。
一些有效的routing_key例子: "stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。只要您愿意,路由键中可以有任意的单词,但最多255个字节。
绑定键也必须是相同的形式。topic交换背后的逻辑与direct topic交换背后的逻辑类似 - 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。
但是绑定键有两个重要的特殊情况:
- * (star) 可以代替一个字。
- # (hash) 可以替代零个或多个单词。
在这个例子中解释这个很简单:
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的路由键发送。
路由关键字中的第一个单词将描述速度,第二个颜色和第三个物种:" <celerity> <color> <species> "。
我们创建了三个绑定:Q1绑定了绑定键" *.orange.* ",Q2绑定了" *.*.rabbit "和" lazy.#"。
这些绑定可以概括为:
- Q1对所有的橙色动物都感兴趣。
- Q2希望听到关于兔子的一切,以及关于懒惰动物的一切。
将路由键设置为"quick.orange.rabbit"的消息将传递到两个队列。消息"lazy.orange.elephant"也会去他们两个。
另一方面,"quick.orange.fox"只会进入第一个队列,而"lazy.brown.fox"只会进入第二个队列。
"lazy.pink.rabbit"只会传递到第二个队列一次,即使它匹配了两个绑定。
"quick.brown.fox"不匹配任何绑定,因此将被丢弃。
如果我们违反我们的合同并发送带有一个或四个单词的消息,如"orange"或"quick.orange.male.rabbit",
会发生什么情况?那么,这些消息将不匹配任何绑定,并会丢失。
另一方面,"lazy.orange.male.rabbit"即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。
direct change
话题交换功能强大,可以像其他交流一样行事。
当使用" \# "(散列)绑定键绑定队列时,它将接收所有消息,
而不管路由密钥如何 - 就像在*fanout*交换中一样。
当在绑定中没有使用特殊字符"\*"(星号)和"\#"(散列)时,主题交换将像*direct*交换一样。
把它放在一起
我们将在我们的日志系统中使用topic交换。我们首先假定日志的路由键有两个单词:" <facility>.<severity> "。
代码几乎与前一个教程中的代码相同 。
emit_log_topic.py的代码:
#!/usr/bin/env python
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
routing_key = sys.argv[1:] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
receive_logs_topic.py的代码:
#!/usr/bin/env python
import sys
import pika
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connection = pika.BlockingConnection(pika.ConnectionParameters('172.17.0.2'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
要接收所有日志运行:
python receive_logs_topic.py "#"
要从设施“ kern ” 接收所有日志:
python receive_logs_topic.py "kern.*"
或者,如果您只想听到关于“ critical ”日志的信息:
python receive_logs_topic.py "*.critical"
您可以创建多个绑定:
python receive_logs_topic.py "kern." ".critical"
发布带有路由键“ kern.critical ”类型的日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"