为什么要了解AMQP?
最近在学习RabbitMQ消息中间件,而RabbitMQ是AMQP的标准实现。要了解RabbitMQ,必须先了解AMQP协议。
什么是AMQP(Advanced Message Queuing Protocol)?
AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端和消息中间件之间可传递消息,并不受客户端/消息中间件产品/不同开发语言等条件限制。
AMQP分层
AMQP定义了合适的服务器端域模型,规范服务器的行为(AMQP的服务器端称broker),
Model层
model层定义了基本的域模型所产生的行为,这种行为在AMQP中用command
表示。
Session层
session层定义了客户端与broker之间的通信,为command
的可靠传输提供保障。
Transport层
Transport层专注与数据传送,并与Session保持交互,接受上层的数据,组装成二进制的流,传送给receiver后再解析数据,交付给Session层。
Session层需要Transport层完成网络异常的回报,顺序传送command
等工作。
AMQP的域模型
AMQP的主要功能是什么
消息中间件的主要功能是消息的路由(Routing)
和缓存(Buffering)
AMQP提供了这两个功能的模型:Exchange和Message Queue
Exchange的作用是什么
Exchange接收Producer发送的Message,根据不同的路由算法,将Message发送给Message Queue.
Message Queue作用是什么
- Message Queue在Message没有被Consumer消费时,缓存这些Message,具体的缓存策略由实现者决定。
- 当Message Queue与Message consumer之间的连接畅通时,Message queue需要将Message转发给consumer。
什么是Message
Message 是模型的基本单位,由Producer产生,经过Broker被Consumer所消费。
Message的结构是什么样的
两部分组成:Header和Body
Header中有什么
由Profucer添加的各种Exchange和queue相关的属性集合。
Body中有什么
真正需要传输的数据,对Broker不可见,不可被修改。
Exchange如何将Message发送到Message Queue中?
Binding用来将Exchange和Message Queue关联起来。
Exchange和Message Queue如何关联绑定?
example:
Exchange :E
Queue: Q
如何将Q与E进行绑定,使Exchange将Message发送到Queue?
创建Binding,Binding定义了一个routing key(路由键),表示带有该routing key的Message,到Exchange E时,会被发送到绑定的Queue Q中.
一个Exchange可能会与多个queue进行关联,Exchange中就会有一个路由表,由Binding定义的Routing key组成,表示每个与Exchange绑定的Queue的路由。
client在发送Message时,在Message的header中添加routing key属性。
Exchange获取到Message后,会检测Message的header属性。若header中的routing key与binding的routing key一致,则Exchange 将Message发送到Binding绑定的Queue。
这就是单播路由的一种方式。
路由规则只由Binding决定吗?
上面说过单播路由的实现是使用Binding.
但实际上Exchange有多种路由形式。不同的路由算法有不同的Exchange Type.
Direct Exchange就是上面所说的单播路由,需要和Binding规则一起组成完整的路由规则。
所以路由算法由Exchange Type和Binding的规则所决定。
AMQP如何实现广播路由机制的呢?
广播路由的Exchange Type是Funout Exchange.
将Message路由到所有的与Exchange的queue中,无视绑定的routing key.
如果N个Queue绑定到某个Exchange,当有Message发送给Funout Exchange时,Exchange会将Message的拷贝分别发送给所有的N个队列。
这就实现了广播路由机制,Funout Exchange也被用来实现广播路由。
Exchange Type一旦创建就不可改变
以上就是AMQP的路由规则,再来思考下Client是如何与AMQP的Broker进行交互?
Clinet如何与AMQP连接?
AMQP连接通常是长连接,是使用TCP提供可靠投递的应用层协议。
关于TCP等网络协议需要学习
AMQP使用认证机制并且提供TLS(SSL)保护。
当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
关于认证机制和TLS同样需要学习
有些应用需要与AMQP的broker建立多个连接,同时开启多个TCP是不是不太合适?
是的,同时开启多个TCP不仅耗费太多的网络资源,而且使防火墙的配置更加困难。
所以AMQP提供了Channel(消息通道)
什么是Channel
一个AMQP连接可以并发的建立多个Channel,每个Channel可以看成共享一个TCP连接的多个轻量化连接。
每个Channel的Session与其他Channel的Session是完全隔离的,所以每个AMQP方法都需要携带一个ChannelId,这样Client就可以指定方法是为那个Channel准备的。
什么是AMQP的方法?Clinet与AMQP的Session是咋样的?
AMQP中有很多方法,方法即操作。
Client与AMQP的Session交互就是AMQP的一组操作实现的。
操作:
-
- Exchange的声明和删除
- 1.1 Exchange声明
- Exchange.declare: 请求-requests(由Client发送)
- Exchange.declare-ok:
响应-reponses(由broker发送,回应之前提到的request操作)
- Queue的声明
Queue的声明与Exchange声明类似。
- Queue的声明
Example
exchange.declare
P/C ->
exchange.declare(name="my.exchange" type="direct" durable=true)
-> AMQP broker
说明:exchange.declare方法携带了好几个参数,这些参数可以允许Client指定Exchange的名称,类型,是否持久化等信息
exchange.declare-ok
操作成功之后,broker使用exchange.declare-ok方法进行回应:
P/C <-
exchange.declare-ok
<- AMQP broker
说明:exchange.declare-ok方法除了ChannelId之后没有携带其他的任何参数。
一个单独的Broker如何实现多个隔离的环境(用户、用户组、Exchange、queue)?
AMQP提供了一个虚拟主机的概念(Virtual hosts),提供了完全隔离的环境,当连接被建立的时候,AMQP的客户端来指定使用哪个Virtual Host.