AMQP 0-9-1 模型
AMQP 0-9-1 及其模型概述
什么是AMQP 0-9-1
AMQP 0-9-1(Advanced Message Queuing Protocol)是一个消息协议,使得符合标准的客户端应用程序能够与符合标准的消息中间件一同工作
Broker及其作用
消息的broker从发布者那里接收到消息(也叫做生产者),然后将这些消息路由到消费者那由消费者处理
因为这是一个网络协议,因此生产者,消费者,和broker都部署在不同的机器上
AMQP 0-9-1 模型简介
AMQP 0-9-1模型大致如下:消息先被发送到exchanges(类似于邮箱),exchanges通过一些规则,称之为baddings,将这些消息复制到队列s中,最后broker将队列中消息分发到订阅了这些队列的消费者中,或者由消费者来从队列中按需拉取消息
当发布消息时,发布者或许会指定一些消息属性(消息元数据),这些属性信息或许会被broker使用,除此之外broker看不到其他的信息了,其他的信息仅仅会被接收到消息的应用程序使用
网络是不可靠的,因此应用程序在处理消息时不可避免的会出现失败的问题,因此AMQP 0-9-1模型有消息确认的概念,当消息被分发到消费者后,消费者会自动的给broker一个回应,这个回应也可以由开发人员选择什么时候发送。当使用消息确认时,broker只有在收到该消息(或消息组)的通知时才会从队列中完全删除该消息
在某些情况下,如消息不能被路由时,消息可能会被返回给发布者,也可能会被丢弃或者,如果broker实现了一个扩展,将这些消息放到一个专门处理的队列中。发布者通过使用某些参数发布消息来选择如何处理这样的情况
队列,exchange,binding都对应到AMQP的实体
AMQP 0-9-1 是一个可编程协议
AMQP 0-9-1 是一个可编程协议,这意味着AMQP 0-9-1 的实体和路由机制主要是由应用程序定义的,而不是broker的管理员。因此,为协议操作所需要做的准备就是,声明队列和exchange,定义他们之间的bingding,订阅队列等
这给了应用程序开发者很大的自由,但也需要他们注意潜在的定义冲突。实际情况下,定义冲突是很少见的,它经常在配置错误时才会出现
应用程序声明它们需要的AMQP 0-9-1实体,定义必要的路由机制,或许也会选择在它们不再被使用时删除实体
Exchange及其类型
exchange是AMQP 0-9-1的实体,消息被发布者发出后,首先到达exchange,然后由exchange将其路由到队列中。路由的算法取决于exchange的类型及其使用的规则(称之为bindings),AMQP 0-9-1的broker提供了以下的exchange类型
exchange 类型 | 默认的预定义名称 |
---|---|
Direct exchange | 空字符串或者amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match(在RabbitMQ中是amq.headers) |
除了类型以外,exchange还有许多的数星星,其中最重要有以下几个属性
- Name
- Durability(在broker重启后exchange是否还存在)
- Auto-delete(当没有队列绑定到exchange时,是否把exchange删除)
- auguments(可选的,被插件和特定于broker的特性使用)
Exchange可以是durable或transient,在broker重启后,durable exchange能够存活,而transient exchange则不能(即当broker重启后需要重新声明exchange),并不是所有的场景和用例都需要durable exchange
默认的exchange
默认的exchange是一个没有名字的(即空字符串),由broker预定义的direct exchange,它有一个特殊的属性,这使得它在简单的应用程序中非常有用:创建的每个队列都会自动用与队列名称相同的路由键绑定到它
例如,当你声明了一个名为"search-indexing-online"的队列的时候,AMQP 0-9-1的broker将会将其绑定到默认的exchange上,并使用队列名"search-indexing-online"作为路由键(有时也被称为绑定键)。因此一个带有路由键为”search-indexing-online"的消息被发送到默认的exchange时,会被转发到队列search-indexing-online。换句话说,换句话说,默认exchange直接将消息传递到队列看起来是可能的,尽管从技术上讲这并不是正在发生的事情
direct exchange
direct exchange基于消息的路由键来将消息分发到队列,direct exchange适用于单播的情况,虽然也能用于多播。下面是它的工作流程:
- 队列通过一个路由键K将其绑定到exchange
- 当带有路由键R的新消息到来时,exchange判断是否K=R来决定是否将其转发到队列
direct exchange通常用于以轮询的方式在多个worker(同一应用程序的实例)之间分配任务,但是需要注意的是,在AMQP 0-9-1中,消息的负载均衡是对消费者之间的,而不是队列之间
direct exchange的描述图如下
fanout exchange
fanout exchange 会将消息路由到绑定在它之上的所有的队列中,无视路由键的存在。如果fanout exchange上绑定了N个队列,那么当新消息到来时,就会将消息路由到这所有的N个队列中。fanout exchange适用于广播的情况
因为fanout exchange会将消息转发到绑定在其上的所有队列,所以fanout exchange适用于以下情况:
- MMO游戏用来作为排行榜或其他的全局事件
- 运动新闻站点用来实时分发比分更新
- 分布式系统用来广播一些状态信息或者配置更新信息
- 用于群聊中的消息分发(尽管AMQP没有presence的概念,所以XMPP可能是一个更好的选择)
fanout exchange的描述图如下
topic exchange
topic exchange 通过路由键和队列绑定到exchange上时用的模式来决定将消息路由到哪个队列中。topic exchange通常用于实现各种发布/订阅模式及其变体,topic exchange 经常用于多播的需求
topic exchange可以用于很多地方,每当一个问题涉及多个消费者/应用程序,且这些消费者/应用程序有选择地选择他们想要接收的消息类型时,就应该考虑topic exchange的情况
比如以下场景:
分发与特定地理位置相关的数据,例如销售点
由多个worker完成的后台任务处理,每个worker都能够处理特定的任务集
股票价格更新(以及其他类型财务数据的更新)
涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)
云中不同类型服务的编排
分布式架构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一种架构或操作系统
headers exchange
headers exchange是为在多个属性上进行路由而设计的,这些属性更容易表示为消息报头而不是路由键。headers exchange会忽略路由键的属性,而是使用其他属性来进行路由。如果消息头的值等于绑定时指定的值,则认为消息匹配
可以使用多个头部属性将队列绑定到headers exchange以进行匹配。在这种情况下,代理需要来自应用程序开发人员的另一条信息,即怎样才算匹配成功,是所有的头部属性都匹配还是因为只要一个属性匹配就行了。这就是“x-match”绑定参数的作用。当"x-match"参数设置为"any"时,只需一个匹配的头值就足够了。或者,将“x-match”设置为“all”要求所有值必须匹配
headers exchange可以认为是direct exchange的一种升级版,不需要只依赖路由键,且路由信息也不需要是字符串
注意,以字符串x-开头的头将不会用于计算匹配
队列
在AMQP 0-9-1模型中的队列和其他消息协议中的队列差不多,它们存储了需要被应用程序消费的信息,队列和exchange共享一些属性,但是也有一些额外的属性:
- Name
- Durable(在broker重启后是否存活)
- Exclusive(仅供一个连接使用,当该连接关闭时队列将被删除)
- Auto-delete(当订阅该队列的消费者都取消队列后,是否删除该队列)
- Arguments(可选的,是由插件和broker特性所使用的,如消息TTL,队列长度等等)
在使用队列前必须先声明队列,如果声明队列时队列不存在,则会创建队列。如果队列已经存在了且和声明队列拥有一样的属性,则该声明没有任何作用。如果存在的队列和声明的队列不是完全一样的属性,则会抛出一个错误码为406的channel级别的异常,PRECONDITION_FAILED
队列名字
应用程序可以指定队列名,或者让broker来生成队列名,队列名由UTF-8字符组成,且不超过255个字节。AMQP 0-9-1 broker可以代表应用程序生成一个唯一的队列名称,如果需要让broker来生成队列名,只需要在让队列名参数为空字符串即可,生成的队列名将会和队列声明响应一同返回给客户端
以"amq."开头的队列名是保留给broker内部使用的,如果尝试声明这样的队列名将会造成一个channel级别的异常,响应码为403,ACCESS_REFUSED
队列的durability
在AMQP 0-9-1中,队列可以被声明为durable和transient,durable的队列的元数据会被存储到磁盘上,而transient队列的元数据只会保存在内存中
在持久性很重要的环境和用例中,应用程序必须使用durable队列,并确保发布将已发布的消息标记为持久消息
Bindings
bindings是exchange用来将消息路由到队列的规则,要指示exchange E将消息路由到队列Q,Q必须绑定到E。bingding可能有一个可选的路由键属性,供某些交换器类型使用。路由键的目的是选择发布到exchange的某些消息,以便路由到绑定队列。换句话说,路由键的作用类似于过滤器
有了这个间接层,就可以使得不可能或很难实现直接发布到队列的路由场景也能实现,还可以消除应用程序开发人员必须做的一定数量的重复工作
如果消息不能被路由到任何一个队列,那这个消息要么被丢弃,要么返回给发布者,这取决于发布者发布该消息时设置的属性
消费者
在AMQP 0-9-1模型中,有两种方式让应用程序可以消费发布者的消息:
- 订阅队列,以便将消息传递给应用程序("push API"):推荐
- 拉取消息("pull API"):这种方式是非常低效的,应该尽可能避免
在push API中,应用程序需要订阅具体的队列来消费消息,当它们订阅后,我们说它们注册了一个消费者,或者简单地说,订阅了一个队列。每个队列都有可能有多个消费者,或者注册专属消费者(在消费队列中的消息时排除所有其他消费者)
每个消费者都有一个标识符,叫做消费者标签(consumer tag),它可以用来取消订阅消息,这个标签是一个字符串
消息确认
消费者应用程序接收并处理消息时候,可能偶尔无法处理个别消息,或有时应用程序崩溃了,这有可能是网络波动导致得问题。那么,broker应该何时把消息从队列中移除呢?AMQP 0-9-1规定由消费者来控制这一点。这有两种确认的方式:
- 在broker发送消息给应用程序之后(使用basic.deliver或者basic.get-ok方法)
- 在应用程序返回消息确认之后(使用basic.ack方法)
前一种方式叫做自动确认模型,后一种方式成为显式确认模型。在显式模型中,由应用程序选择何时回送ACK消息给broker,一般是在收到消息之后,或者在消息处理之前,持久化到存储之后,或者完全处理消息之后(例如,在成功抓取到Web页面,处理并存储到持久层之后)
如果消费者在回送ACK消息之前就挂掉了,那么broker将会重新分发该消息给其他的消费者,如果这时没有可用的消费者,那么broker将会等到至少一个消费者可用之后,尝试重新发送
拒绝消息
当消费者应用程序收到消息进行处理时有可能会失败,此时应用程序可以通过拒绝消息向broker表明消息处理失败了(或者不能在当时完成处理)。拒绝消息时,应用程序可以让broker丢弃该消息,或者让它重新入队。当队列上只有一个消费者时,请确保不要因为一次又一次地拒绝和重新入队而造成无限的消息传递循环
否定确认
通过basic.reject方法来拒绝消息,该方法有一个限制:它没有办法像使用确认一样拒绝多个消息。然而,如果使用RabbitMQ,有一个解决方法,RabbitMQ提供了一个AMQP 0-9-1的扩展程序来进行否定确认
预先抓取消息
在多个消费者共享一个队列时,在发送下一个确认之前,能够指定一次可以发送消费者多少条消息,这能被用来作为一个简单的负载均衡技术,或者在消息倾向于批量发布时用于提高吞吐量,比如如果生产者应用程序固定每分钟发送消息时
注意RabbitMQ仅仅支持channel级别的预先抓取数量
消息的属性及载荷
在AMQP 0-9-1中,消息有多个属性,一些属性是AMQP 0-9-1规定的公有属性,应用程序开发人员不必考虑确切的属性名称,比如:
- Content type
- Conten encoding
- Routing key
- Delivery mode(是否持久化)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
有些属性是提供给broker使用的,但是大多数属性都可以通过接收它们的应用程序进行解释。一些属性是可选的属性,它们被称为headers,类似于HTTP的header属性。消息属性在它们发布时被设置
消息也有载荷信息,即消息承载的数据,这些载荷对AMQP的broker来说是不可见的,仅仅作为字节数组来对待。消息也可以只包含属性而没有载荷。常见的方式是将结构化数据通过序列化(如JSON,thrift,protocol buffers,MessagePack)成字节数据,再放到消息载荷中。协议对等体通常使用“content-type”和“content-encoding”字段来通信该信息,但这只是根据约定
消息也可以做持久化发布,这使得broker将它们持久化到磁盘上。这确保在服务器重启时,消息不会丢失。仅仅将消息发布到durable exchange或将消息路由到的队列是持久的,这并不能使消息持久化,消息的持久化完全取决于消息本身的模式。选择消息持久化,必然会导致一些性能上的损失
消息确认
由于网络不可靠且应用程序有可能故障,因此通常需要方式来确认消息是否完成。 有时只需要确认消费者已收到消息,有时消息确认意味着消息已由消费者验证并处理,例如,验证为具有强制性的数据并持久保存到数据存储或索引
这种情况非常常见,因此 AMQP 0-9-1具有称为消息确认(有时称为 acks)的内置功能,消费者使用它来确认消息传递和处理。 如果应用程序崩溃(当连接关闭时AMQP broker会注意到这一点),如果 AMQP broker未收到消息确认,则会将该消息将重新入队(可能会立即传递给另一个消费者,如果有的话)
在协议中内置消息确认机制有助于开发人员构建更强大的软件
AMQP 0-9-1方法
AMQP 0-9-1是由很多方法构成的,方法是操作(如HTTP方法),与面向对象编程语言中的方法没有任何共同之处。在AMQP 0-9-1中,协议方法被组织成classes,classed是AMQP方法的逻辑分组
exchange分组中的方法都是关于exchange的操作,它包含以下操作:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
注意RabbitMQ站点的参考资料还包括了针对exchange类的RabbitMQ特定扩展,这里不做讨论
上述这些操作是“请求”(由客户端发送)和“响应”(由代理响应前面提到的“请求”)
以下是一个通过exchange.declare方法声明一个新的exchange的过程:
如上图说是,exchange.declare请求也承载了几个参数,它们使得客户能够指定exchange的名字,类型,持久化标志等等
如果操作成功,broker会通过exchange.declare-ok方法进行响应
exchange.declare-ok除了通道号之外,并不承载任何参数(通道的概念会在后面讲述)
事件的顺序和AMQP 0-9-1的queue分组中的queue.declare和queue.declare-ok非常相似
并不是所有的AMQP 0-9-1方法都有对应的另一个方法。一些(basic.publish是最广泛使用的一个)没有相应的“响应”方法和其他一些(例如,basic.get)有多个可能的“响应”的方法
连接
AMQP 0-9-1连接是一个长期存活的,AMQP 0-9-1是一个应用程序级别的协议,使用TCP来做可靠的分发。连接需要认证且使用TLS进行保护,当一个应用程序不再需要连接到服务器时,应该优雅地关闭它到AMQP 0-9-1的连接,而不是粗鲁的直接关闭底下的TCP连接
通道
有的应用程序需要与broker建立多个连接,然而,同时保持许多TCP连接是不可取的,因为这样做会消耗系统资源,并使防火墙的配置更加困难。AMQP 0-9-1的连接通过通道来做多路复用,这样就可以在多个与broker的连接上服用同一个TCP连接
每个被客户端执行的协议操作都会出现在一个通道上,一个通道上的信息交互与其他通道是完全隔离的,因此每个协议方法也会承载通道ID信息,这个ID是一个整数,broker和客户端使用都使用通道ID用来确定协议方法使用哪个通道
通道只存在于连接的上下文中,而不是单独存在的。当一个连接关闭时,它上面的所有通道也都关闭了
当应用程序使用多线程或者多进程来处理消息时,通常会为每个线程(进程)打开一个新的通道,而不是在线程之间共享通道
虚拟主机
为了使单个broker能够托管多个隔离的“环境”(用户组、exchange、队列等),AMQP 0-9-1提供了虚拟主机(vhost)的概念。vhost类似于许多流行的Web服务器使用的虚拟主机,提供AMQP实体所在的完全隔离的环境。协议客户端在连接协商期间指定他们想要使用的vhost
AMQP是可扩展的
AMQP 0-9-1提供了以下的扩展点:
- 自定义exchange类型,允许开发人员实现一些exchange类型不能覆盖的路由方案,例如,基于地理数据的路由
- exchange和queue可以包含额外的可被broker使用的数据,例如,pre-queue和消息TTL等
- 特定于broker的扩展
- 引入新的message类
- broker可以通过其他插件进行扩展,例如RabbitMQ管理前端和HTTP API都是通过插件实现的
这些特点使AMQP 0-9-1模型更加灵活,适用于非常广泛的问题
AMQP 0-9-1的客户端生态系统
AMQP 0-9-1有很多的编程语言和平台,它们中的一些密切遵循AMQP术语,只提供AMQP方法的实现。其他一些具有额外的功能、方便的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些同时支持这两种模型。一些客户端支持特定于供应商的扩展,如rabbitmq
因为AMQP的主要目标之一是互通性,所以对开发人员来说,理解协议操作而不局限于特定客户端库的术语是一个好主意。通过这种方式与使用不同库的开发人员进行通信将会非常容易