一. AMQP简要说明
1.1 exchange, binding , queue
AMQP(Advanced Message Queuing Protocol)是一种消息协议. 简单来说, 一个消息可以被发送到exchange
里, exchange
相当于邮局或信箱(mailbox); 然后根据一定的规则把消息拷贝分发到相应的queue
里面, 这里的规则叫做binding
. 然后, broker
把每一条消息分发给对应的客户端, 或者等客户端需要的时候自己来fetch. exchange ,binding, queue 都叫做 AMQP 0-9-1 的实体
1.2 AMQP 0-9-1
是可编程协议
AMQP 0-9-1
协议是可编程的, 因为相关的 exhange, binding, queue 是程序自身定义的, 而不是 broker 管理员预先设置的. 这让应用开发者更为自由, 却也要求开发者们了解潜在的配置冲突.
应用可以声明 AMQP 0-9-1 实体
, 也可以在不再使用这些实体时删除他们
二. 几种类型的Exchange
2.1 Exchange的4个类型?
Exchange 在 AMQP 0-9-1 的实体
里属于发送消息的, 它先接受一条消息, 然后将消息路由到0个或多个队列中. 路由算法依赖于2个部件:
-
Exchange
的类型 - 路由规则, 在组件中叫做
binding
AMQP 0-9-1 的 broker 提供了四种类型的 exchange :
Exchange类型 | 每种 exchange 在 borker 中预先定义的名字 |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
下图为 borker 中预先定义的 exchange :
除了类型, exchange 还有一些列重要的属性
- 名字
- Durability 可持久化: (broker重启后, exchange是否仍然存在)
- Auto-delete 自动删除: (当最后一个队列与 exchange 解绑后, 该 exchange 是否会自动删除)
- 参数 (可选的, 被插件和 broker 需要的个性化参数)
Exchanges 可以是持久化的( durable )或是暂时( transient )的, 持久化的 exchanges 在 broker 重启后仍然存在, 而暂时 exchange 在broker重启上线后需要重新声明
2.2 Default Exchange
Default Exchange 是一种特殊的直接 Exchange, 由 broker 预先定义好的, 名字为"空字符串". 每个队列, 被创建后都会被自动绑定到默认 exchange 上, 使用一个和队列名同名的 routing key.
比如, 一个名为 search-indexing-online 的队列, AMQP 0-9-1 broker 会自动将其绑定到默认 exchange, 其使用名字 search-indexing-online 作为 routing key. 所有发送到"空字符串" exchange, 且使用 search-indexing-online 作为 routing key 的消息都会被路由到 search-indexing-online 队列. 换句话说, 默认 exchange 就像将消息直接发送到队列一样
2.3 Direct Exchange
Direct Exchange 是基于 routing key 发送消息的 exchange. 直接 Exchange 是实现单播消息的最理想方案 (尽管它也能用于多播). 它的工作逻辑是:
- 队列使用 routing key K 绑定到 exchange
- 当一个 routing key 为 R 的消息到达
直接 exchange
时, exchange 会把消息路由到同样 routing key 为 R 的队列
直接 exhcange
特别适合在多个 works 之间发送任务, 这里的 work 通常是同一个应用的不同实例. 任务将通过 轮训 的方式发送给这些work. 也就是说, 在 AMQP 0-9-1 中, 负载均衡是在 consumer 上的负载均衡, 而不是队列上的负载均衡.
2.4 Fanout Exchange
Fanout Exchange 扇形交换机会把消息路由到所有与之绑定的队列上, 队列的 routing key 将被忽略. 假设有 N 个队列绑定到了一个 fanout exchange 上, 则消息将会被拷贝发送至所有的 N 个队列. Fanout Exchange 是理想的消息"广播路由"-broadcast
Fanout Exchange 使用场景大多类似:
- Massively multi-player online (MMO) games can use it for leaderboard updates or other global events
- Sport news sites can use fanout exchanges for distributing score updates to mobile clients in near real-time
- Distributed systems can broadcast various state and configuration updates
-
Group chats can distribute messages between participants using a fanout exchange (although AMQP does not have a built-in concept of presence, so XMPP may be a better choice)
2.5 Topic Exchange
Topic Exchange 可以将消息路由到一个或多个与之绑定的队列中, 匹配规则为使用消息的 routing key 和模式匹配串, 主题交换机
通常用于实现各种 pub-sub 模式, 用于消息的多播 - multicast
Topic Exchange 有非常广泛的使用场景:
- Distributing data relevant to specific geographic location, for example, points of sale
- Background task processing done by multiple workers, each capable of handling specific set of tasks
- Stocks price updates (and updates on other kinds of financial data)
- News updates that involve categorization or tagging (for example, only for a particular sport or team)
- Orchestration of services of different kinds in the cloud
- Distributed architecture/OS-specific software builds or packaging where each builder can handle only one architecture or OS
2.6 Headers Exchange
Headers Exchange 不适用 routing key 作为路由依据, 而是依靠消息头( message header )中的多个属性作为路由依据; 即使给 Headers Exchange 指定了 routing key, 该 exchange 也会自动忽略这个属性. 只有消息头中的内容和绑定在 exchange 上的队列声明的消息头相等时, 消息才会被送达.
Headers Exchange 可以使用多个消息头作为匹配依据, exchange 的绑定参数中, 有一个叫" x-match ": 当设置为 "any" 时, 只匹配成功一个消息头即可; 当设置为 "all" 时, 需要匹配到所有的消息头
Headers Exchange 也可以看做是一种特殊的 "Direct Exchange", 只是它不使用 routing key 作为判等依据, 而是使用消息头作为判等依据.
[注]: 以x-
开头的消息头, 不会作为匹配项
三. Queue
AMQP 0-9-1 中的队列和其它消息系统类似, 队列除了和exchange共享一些属性外, 还有一些额外的属性:
- Name
- Durable (broker重启后, queue是否幸存)
- Exclusive (只被1个连接使用时, 当该连接关闭后queue被删除)
- Auto-delete (当最后一个consumer退订后删除队列)
- Arguments (消息的TTL, 队列的长度限制等)
队列在使用前应先被 declar, declar 动作发生时, 如果发现不存在该队列, 则会自动创建该队列; 如果发现已存在该队列, 但属性配置不同, 则会抛出channel级的异常, 返回406码( PRECONDITION_FAILED )
3.1 队列名
队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望broker 生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为在 declare 的响应中, 生成的队列名包含在了 response 中
以"amq."开始的队列名称被预留做 broker 内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。
3.2 队列持久化
声明为持久化的队列, 其元数据会存储在硬盘上. 持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
更多内容参看https://www.rabbitmq.com/queues.html#durability
四. Binding
绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
打个比方:
- 队列(queue)是我们想要去的位于纽约的目的地
- 交换机(exchange)是JFK机场
- 绑定(binding)就是JFK机场到目的地的路线。能够到达目的地的路线可以是一条或者多条
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。
如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。
五. 消费者
应用程序, 必须明确表示想要消费哪个队列的消息. 一旦应用做出表示, 就说应用注册了一个 consumer , 或说应用订阅了某个队列.
一个队列可能有多于一个的消费者, 也可能只有一个排它的消费者(在消费消息时排斥所有其他消费者)
每个消费者都有一个唯一标识 consumer tag
, consumer tag 是一个字符串, 可以用来取消订阅
https://www.rabbitmq.com/tutorials/amqp-concepts.html#programmable-protocol
https://www.cnblogs.com/demodashi/p/10521706.html