rpc消息到manage method的过程是oslo.messaging实现的
OpenStack的oslo.messaging wiki十分详细的解释了这个流程,也没有比这个来得更加正式和官方
https://wiki.openstack.org/wiki/Oslo/Messaging#Server_Side_API
Openstack中其他组件的rpc server实现方式基本相同
cinder-volume的初始化代码在cinder/service.py Service.start
中
def start(self):
...
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
self.rpcserver = rpc.get_server(target, endpoints)
self.rpcserver.start()
...
get_server调用到oslo_messaging/rpc/server.py get_rpc_server
中
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
"""Construct an RPC server.
The executor parameter controls how incoming messages will be received and
dispatched. By default, the most simple executor is used - the blocking
executor.
If the eventlet executor is used, the threading and time library need to be
monkeypatched.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of a message executor - for example
'eventlet', 'blocking'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
MessageHandlingServe
类如下:
class MessageHandlingServer(object):
"""Server for handling messages.
Connect a transport to a dispatcher that knows how to process the
message using an executor that knows how the app wants to create
new tasks.
"""
def __init__(self, transport, dispatcher, executor='blocking'):
"""Construct a message handling server.
The dispatcher parameter is a callable which is invoked with context
and message dictionaries each time a message is received.
The executor parameter controls how incoming messages will be received
and dispatched. By default, the most simple executor is used - the
blocking executor.
:param transport: the messaging transport
:type transport: Transport
:param dispatcher: a callable which is invoked for each method
:type dispatcher: callable
:param executor: name of message executor - for example
'eventlet', 'blocking'
:type executor: str
"""
self.conf = transport.conf
self.transport = transport
self.dispatcher = dispatcher
self.executor = executor
try:
mgr = driver.DriverManager('oslo.messaging.executors',
self.executor)
except RuntimeError as ex:
raise ExecutorLoadFailure(self.executor, ex)
else:
self._executor_cls = mgr.driver
self._executor = None
super(MessageHandlingServer, self).__init__()
def start(self):
"""Start handling incoming messages.
This method causes the server to begin polling the transport for
incoming messages and passing them to the dispatcher. Message
processing will continue until the stop() method is called.
The executor controls how the server integrates with the applications
I/O handling strategy - it may choose to poll for messages in a new
process, thread or co-operatively scheduled coroutine or simply by
registering a callback with an event loop. Similarly, the executor may
choose to dispatch messages in a new thread, coroutine or simply the
current thread.
"""
if self._executor is not None:
return
try:
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self._executor = self._executor_cls(self.conf, listener,
self.dispatcher)
self._executor.start()
cinder-volume使用eventlet类型的executor,所以消息分发在oslo.messaging/oslo_messaging/_executors/impl_eventlet.py
中实现,入口函数为start()
每个消息,cinder-volume就会孵化一个协程去执行dispatcher方法,如下代码
def _dispatch(self, incoming):
spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)
def start(self):
if self._thread is not None:
return
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while self._running:
incoming = self.listener.poll()
if incoming is not None:
self._dispatch(incoming)
except greenlet.GreenletExit:
return
self._running = True
self._thread = eventlet.spawn(_executor_thread)
dispatcher对应oslo.messaging.rpc.RPCDispatcher
,其中的\_\_call\_\_
方法先回复rabbitmq的ack消息,然后再解析消息参数,获得指定的方法名,最后调用方法
@contextlib.contextmanager
def __call__(self, incoming, executor_callback=None):
incoming.acknowledge()
yield lambda: self._dispatch_and_reply(incoming, executor_callback)
def _dispatch_and_reply(self, incoming, executor_callback):
try:
incoming.reply(self._dispatch(incoming.ctxt,
incoming.message,
executor_callback))
except ExpectedException as e:
LOG.debug(u'Expected exception during message handling (%s)',
e.exc_info[1])
incoming.reply(failure=e.exc_info, log_failure=False)
except Exception as e:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling: %s'), e,
exc_info=exc_info)
incoming.reply(failure=exc_info)
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del exc_info
def _dispatch(self, ctxt, message, executor_callback=None):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
:raises: NoSuchMethod, UnsupportedVersion
"""
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target
if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue
if hasattr(endpoint, method):
localcontext.set_local_context(ctxt)
try:
return self._do_dispatch(endpoint, method, ctxt, args,
executor_callback)
finally:
localcontext.clear_local_context()
found_compatible = True
if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)