RabbitMQ在Openstack中的使用
1. AMQP协议
RabbitMQ是Advanced Message Queuing Protocol (AMQP,高级消息队列协议)开放标准的实现,它支持符合标准的客户端请求程序与符合标准的消息中间件代理进行通信。
AMQP中的核心概念:
(1) Broker :消息中间件的服务节点,对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例;
(2) Virtual Host: 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制;
(3) Producer:生产者,消息投递方,生产者创建消息,然后发布到RabbitMQ中;
(4) Consumer :消费者,就是接收消息的一方,消费者连接到RabbitMQ服务器,并订阅到队列上;
(5) Queue :队列,RabbitMQ的内部对象,用于存储消息,RabbitMQ的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费;
(6) Exchange:交换器,生产者将消息发送到Exchange,由交换器将消息路由到一个或者多个队列中;
(7) RoutingKey :路由键,生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
AMQP模型:
AMQP协议消息发送接收流程:
(1) 在Producer客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了;
(2) Exchange能够处理消息的前提是:它至少已经和某个queue或者另外的Exchange形成了绑定关系,并设置好了到这些queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有四种模式。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的queue或者Exchange中(路由规则还会和RoutingKey属性配合使用);
Excahnge中的Routing的四种模式:
1)fanout路由模式,fanout路由模式不需要routingKey。当设置为fanout模式的Exchange收到消息后,然后复制多份,分别发送到和自己绑定的各个queue中,相当于广播模式。
2)direct路由模式:
以上图的配置为例,以routingKey=”aa”发送消息到Exchange,则消息会路由到queueX和queueY,另外路由键与队列名是完全匹配的,如果一个队列绑定到交换机要求路由键为“aa”,不会转发“aa.bb”,也不会转发“aa.cc”,它是完全匹配、单播的模式;如果以routingKey=”bb”,则消息只会路由到queueY。
3)topic路由模式
routingKey中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个),用“.”隔开。以上图中的配置为例,routingKey=”aa.bb.cc”的消息会同时路由到queueX与queueY,routingKey=”aa.cc.dd”的消息会路由到queueX,routingKey=”cc.dd.ff”的消息会路由到queueY,routingKey=”cc.bb.ee”的消息会路由到queueY(虽然与两个routingKey都匹配,但只会投递给一次)。
4)headers,headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。(一般不用)
(3) queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个queue,那么queue将会把这条消息进行存储直到有Channel被创建;如果已经有Channel连接到这个queue,那么消息将会按顺序被发送给这个Channel。
(4) Consumer收到消息后,就可以进行消息的处理了。
2. RabbitMQ实现RPC请求
RabbitMQ的RPC的处理流程图:
RabbitMQ的RPC的处理流程步骤:
(1) 当客户端启动时,创建一个匿名的回调队列。
(2) 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
(3) 请求被发送到rpc_queue队列中。
(4) RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
(5) 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,就是结果了。
3. Oslo_message对RPC的封装
而Oslo.messaging库为OpenStack各个组件使用RPC和事件通知(Event Notification)提供了一套统一的接口,它基于支持 AMQP(Advanced Message Queuing Protocol) 协议的为同一个项目内的各个进程之间的通信提供了 API,如 nova-api 和 nova-scheduler 的通信,cinder-api 和 cinder-volume 的通信等。
下面通过实例看一下Oslo.messaging库使用RPC提供的接口。首先,了解一下Oslo.messaging的Transport和Target,Transport(传输层)主要实现RPC底层的通信以及事件循环、多线程等其他功能,通过URL来获得指向不同Transport实现的句柄。URL格式:Transport://user:pass@host1:port[,hostN:portN]/virtual_host,可通过oslo.messaging.get_transport函数来获得transport对象实例的句柄;Target封装了指定某一个消息最终目的地的所有信息,下表所示为其所具有的属性:
参数 | 说明 |
---|---|
exchange | topic所属的范围,默认使用配置文件中的control_exchange选项 |
topic | 一个topic可以用来标识服务器所暴露的一组接口(一个接口包含多个可被远程调用的方法) |
namespace | 用来标识服务器所暴露的某个特定接口(多个可被远程调用的方法) |
version | 服务器所暴露的接口支持M.N类型的版本号 |
server | 客户端可以指定此参数来要求消息的目的地是某个特定的服务器 |
在不同的应用场景下,构造Target对象需要不同的参数:创建一个RPC服务器时,需要topic和server参数,exchange参数可选; 指定一个endpoint时,namespace和version是可选的;客户端发送消息时,需要topic参数,其他可选。
class ServerControlEndpoint(object):
target = messaging.Target(namespace='controle', version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
return arg
transport_url = 'rabbit://user:pass@host1:port/virtual_host'
transport = messaging.get_transport(cfg.CONF, transport_url)
target = messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = messaging.get_rpc_server(transport, target, endpoints, executor='blocking')
从上面代码可以看出,创建rpc server对象之前,需要先创建transport和target对象,使用get_transport()函数来获得transport对象的句柄,此处构建的Target对象是用来建立RPC Server的,所以需指明topic和server参数,使用get_rpc_server()函数创建server对象,然后调用server对象的start方法开始接收远程调用,对于endpoint,一个RPC服务器可以暴露多个endpoint(本实例有两个),每个endpoint包含一组方法,这组方法是可以被客户端通过某种Transport对象远程调用的,executor是消息构造对象,有blocking的阻塞模式和eventlet采用协程两种方式,OpenStack采用eventlet。
transport_url = 'rabbit://user:pass@host1:port/virtual_host'
transport = messaging.get_transport(cfg.CONF, transport_url)
target = messaging.Target(topic='test')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
cctxt = client.prepare(namespace='control', version ='2.0')
上述代码是通过RPC Client,远程调用RPC Sever上的方法,调用方式有cast和call两种远程调用方式。通过cast方式远程调用,请求发送后就直接返回了;通过call方式调用, 需要等响应从服务器返回。
4. RabbitMQ在Openstack应用
下面以创建虚拟机为例分析一下消息流程:
从上图能够看出,以nova-api和nova-conductor之间的通信为例,nova-conductor服务在启动时会注册一个RPC server等待处理请求,nova-api发送创建虚拟机的rpc请求时会先创建一个topic publisher用于topic发布,method为build_instance,然后publisher将消息发送给exchange,exchange再根据routingkey转发给绑定的queue中,最后由topic consumer接收并调用nova-conductor manager中的build_instance方法处理,对于nova-conductor和nova-scheduler之间的通信,多了一步把目标主机作为返回结果信息返回到reply_xx队列中,然后由nova-conductor接收以后向nova-compute发起rpc.cast的创建请求。
OpenStack各个组件内部的各个服务进程之间则是通过基于AMPQ的RPC方式进行通信,实现RPC通信需借助Rabbitmq消息队列,RPC方式又分为两种,rpc.cast和rpc.call,rpc.call为request/response方式,多用于同步场景;而使用rpc.cast方式发出请求后则无需一直等待响应,但之后需要定期查询执行结果,一般用于异步场景,OpenStack将其使用的通信方式都封装在公有库oslo_messaging中。
5. RabbitMQ在Openstack中应用常见问题之连接数
RabbitMQ每增加一个连接,erlang都会给这个连接分配三个erlang进程,每个进程都会分配一定大小内存空间,所以随着连接数的增长,内存和erlang进程数呈现有规律的增长,所以RabbitMQ连接数的无限增大会压垮mq服务,导致RabbitMQ服务崩溃。
(1) Openstack连接数增长原因
客户端与RabbitMQ建立的是长连接,而不是建立短连接,因为如果频繁的建立、销毁connection,会增加额外的时间开销,当业务量比较大时,就会对系统性能产生比较大的影响。OpenStack组件与RabbitMQ的连接使用到了第三方库oslo_message中的connection pool的概念,在不超过pool size的前提上,当有并发业务的时候,如果发现pool中已有connection正被使用,那么就会在pool中继续创建新的connection,直到创建的connection数量达到pool的最大值,之后如果再有业务需要,会等待之前创建的connection被重新放入connection pool,然后等待被继续使用。这种情况下,就会出现connection一直增长的现象。
(2) Openstack最大连接数计算方法:
OpenStack的Connection连接数包括m台计算节点+n台控制节点两部分之和:
Connection连接数1(m台计算节点) = (64+30)* m;
(其中,“m”指的是m台的计算节点,“64”指的是每个Nova-Compute进程中配置的
最大rpc_conn_pool_size连接数上限,“30”指的是每个Ovs-agent进程中配置的最
大rpc_conn_pool_size连接数上限);
Connection连接数2(n台控制节点)=
(nova-api worker进程数 + nova-conductor worker进程数 )* n * 30
+ (cinder-api worker进程数 + 1 + 1+backend进程数)* n * 30
+ (neutron worker进程数 + 1 + 1)* 3 * 30;
(其中,“n”指的是n台控制节点,“30”指的是“nova-api”、“nova-conductor”和“neutron”
三个进程中配置的最大rpc_conn_pool_size连接数上限);
综合上面两部分,OpenStack的最大Connection连接数 等于:Connection连接数1(m台计算节点)与 Connection连接数2(n台控制节点)之和。此计算值必须小于RabbitMQ能够承载的连接数最大值,才能保证RabbitMQ不被压垮,正常提供服务。对此,对RabbitMQ集群(三个节点)做压力测试能够得到RabbitMQ集群能够承载的连接数最大值。
使用的测试环境配置如下:
cpu | 32 Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz |
---|---|
Memory | 250GB |
OS | centos-release-7-2.1511.el7.centos.2.10.x86_64 |
Kernel | 3.10.0-514.1.el7.x86_64 |
RabbitMQ | 3.6.5 |
Erlang | Erlang R16B03-1 |
压测工具 | PerfTest(RabbitMQ官网推荐) |
计算节点数 | 3 |
控制节点数 | 3 |
测试目的及用例:
利用压力测试工具PerfTest逐步增加集群的三个节点的连接数,观察记录各节点的内存、连接数、日志输出,测试并记录“批量创建虚拟机”和“批量删除虚拟机”两个典型的Openstack业务场景所需时间。找到当虚拟机的创建/删除操作受到影响(创建/删除十分耗时或者不能成功)或者RabbitMQ集群出现异常Error的日志时,从而得到RabbitMQ集群所承受的连接数即为集群所能承受的最大连接数。
对三个节点,通过压力测试工具对每个节点增加2000个连接,等待5分钟(等待集群运行稳定),记录每个节点连接数、内存、erlang进程数、消息积压情况。然后通过脚本批量创建60台虚拟机,记录全部创建完成所需时间、全部删除完成所需时间,记录nova-compute的状态、RabbitMQ日志有无异常输出。重复上述步骤,每次各增加2000个连接,直到发现虚拟机的创建或删除耗时超过30分钟,或者虚拟机无法成功创建或删除,则终止测试。然后观察RabbitMQ集群情况和nova-compute服务30分钟,看是否有RabbitMQ异常日志打印和nova-compute由up变为down。
测试结果:
各个节点状态结果:
节点 | 连接数 | 集群总连接数总连接数 | 内存(G) | erlang进程数 |
---|---|---|---|---|
Node1 | 2421 | 7594 | 1.7 | 29293 |
Node1 | 4422 | 13598 | 2.7 | 51298 |
Node1 | 6423 | 19601 | 3.7 | 73309 |
Node1 | 8424 | 25605 | 4.7 | 95312 |
Node1 | 10425 | 31613 | 6 | 117321 |
Node1 | 11427 | 34624 | 6.6 | 128339 |
Node1 | 12483 | 37713 | 8 | 140018 |
Node2 | 2448 | 7594 | 1.2 | 29561 |
Node2 | 4451 | 13598 | 2.1 | 51587 |
Node2 | 6452 | 19601 | 3 | 73587 |
Node2 | 8454 | 25605 | 3.9 | 95605 |
Node2 | 10460 | 31613 | 4.9 | 117670 |
Node2 | 11465 | 34624 | 6.3 | 128721 |
Node2 | 12487 | 37713 | 7.2 | 139960 |
Node3 | 2724 | 7594 | 1.8 | 32882 |
Node3 | 4725 | 13598 | 2.7 | 54889 |
Node3 | 6726 | 19601 | 3.6 | 76896 |
Node3 | 8727 | 25605 | 4.5 | 98903 |
Node3 | 10728 | 31613 | 5.4 | 120910 |
Node3 | 11731 | 34624 | 5.8 | 131950 |
Node3 | 12743 | 37713 | 6.9 | 143078 |
虚机创建和节点状态相关结果:
总连接数 | 消息积压 | 创60台虚机时间 | 删20台虚机时间 | nova-compute状态 | MQ日志 |
---|---|---|---|---|---|
7594 | 无 | 3min | 1min20s | up | 未出现异常日志 |
13598 | 无 | 2min49S | 1min10s | up | 未出现异常日志 |
19601 | 无 | 2min50s | 1min30s | up | 未出现异常日志 |
25605 | 无 | 2min56s | 1min31s | up | 未出现异常日志 |
31613 | 无 | 4min37s | 2min | up | 未出现异常日志 |
34624 | 无 | 6min30s但是有4台创建失败,原因是"Build of instance was re-scheduled: Request to http://ip:port/v2.0/ports.json timed out (HTTP 408) | 2min20s | up | 未出现异常日志 |
37713 | 14000左右 | 15分钟后。30台成功,30台失败 | 无法删除 | 一段时间后变为down | 出现大量missed heartbeats from client, timeout: 60s |
各个节点及集群统计图的趋势变化:
测试结论:
1) 在集群连接数到达2.5w以上的时候,创建虚机和删除虚机的耗时出现了明显的增长;
2) 集群达到3.5w以上时,出现的大量的消息堆积,说明连接数的增加,影响了客户端对消息的消费;
3) 集群增加到3.7w连接数的时候,客户端大量丢失心跳,尝试重新连接失败,nova-compute节点的状态全部为down,反观内存,socket,erlang等指标并不高,没有达到上限,从这里可以说明集群承载的连接数的能力是有上限的,不能够单纯的从它占用的系统资源来判定;
4)测试得出该集群连接数最好低于3.5w。
6. 小结
本篇文章对AMQP协议的基本概念以及框架流程做了详细的介绍,RabbitMQ使用此协议能够很好的实现RPC请求,Oslo_message对RPC进行的封装能够使用起来更加优雅,然后以创建虚拟机为例分析了RabbitMQ在Openstack中如何应用,针对诸多的RabbitMQ应用问题,选取了连接数问题进行了测试并得出结论,后续会针对RabbitMQ应用的其他问题做出更深入的研究。