一、背景###
由于我公司产品需求想实现行情实时刷新功能,大家都知道这时候NSTimer
不能再满足产品需求,所以我们的服务端采用了RabbitMQ
来实现行情的实时刷新。但是我在集成IOS
端,发现网上几乎没有关于IOS
端集成RabbitMQ
示范文档,所以我想写一篇分享给大家如何在IOS
端集成RabbitMQ
,希望能给大家一点帮助。
二、集成
集成IOS
端RabbitMQ
框架,我们采用RabbitMQ
官方提供的RMQClient框架。集成前的具体准备我不作赘述,官方文档很详细,RMQClient
加入工程之后,请参考RabbitMQ详细集成指南。这篇指南分为"Hello World!"、 Work queues、Publish/Subscribe、Routing、Topics,详细的介绍了RabbitMQ概念
、应用
、各种模式
、集成
等。
我们公司采用的是RabbitMQ
的Publish/Subscribe
消息模式,具体代码如下:
//MARK: - RMQClient
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()
@property (nonatomic,strong) RMQConnection *conn;
@end
@implementation ViewController
- (void)viewDidLoad {
[super viewDidLoad];
// Do any additional setup after loading the view, typically from a nib.
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];
}
#pragma mark - 系统的通知监听
- (void)activeNotification:(NSNotification *)notification{
if (_conn == nil) {
[self receiveLogs];
}
}
- (void)backgroundNotification:(NSNotification *)notification{
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
- (void)viewWillAppear:(BOOL)animated{
[super viewWillAppear:animated];
[self receiveRabbitServicerMessage];
}
- (void)viewWillDisappear:(BOOL)animated{
[super viewWillDisappear:animated];
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
- (void)receiveRabbitServicerMessage {
NSString *url5 = @"amqp://servicerName:servicerPassword@host:port/Vhost";
if (_conn == nil) {//[RMQConnectionDelegateLogger new]
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
}
[_conn start];
id<RMQChannel> ch = [_conn createChannel];
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
RMQQueue *q = [ch queue:@"" options:RMQQueueDeclareExclusive |RMQQueueDeclareAutoDelete];
[q bind:x];
NSLog(@"Waiting for logs.");
[q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
}
三、代码解释
(1)RMQConnection的声明
// RMQConnection 的声明采用如下方式:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
// 源代码释义:
/*!
* @brief Parses URI to obtain credentials and TLS enablement (which implies verifyPeer).
* @param uri The URI contains all connection information, including credentials.<br/>
For example, "amqps://user:pass@hostname:1234/myvhost".<br/>
Note: to use the default "/" vhost, omit the trailing slash (or else you must encode it as %2F).
* @param delegate Any object that conforms to the RMQConnectionDelegate protocol.
Use this to handle connection- and channel-level errors.
RMQConnectionDelegateLogger is useful for development purposes.
*/
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
delegate:(nullable id<RMQConnectionDelegate>)delegate;
参数uri的格式:"amqps://user:pass@hostname:1234/myvhost"
user:用户名;
pass:用户密码;
hostName:域名;
1234:端口码;
myvhost:vhost;
[参数更多参考格式](http://bit.ly/ks8MXK).
参数delegate:
// 如果使用RMQConnectionDelegate
// RMQConnection声明如下:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:self];
// 遵守代理:
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()<RMQConnectionDelegate>
@property (nonatomic,strong) RMQConnection *conn;
@end
// 实现代理:监控RabbitMQ日志
/// @brief Called when a socket cannot be opened, or when AMQP handshaking times out for some reason.
- (void) connection:(RMQConnection *)connection
failedToConnectWithError:(NSError *)error{
if (error) {
NSLog(@"%@",error);
NSLog(@"连接超时");
[self.conn close];
self.conn = nil;
}else{
}
}
/// @brief Called when a connection disconnects for any reason
- (void) connection:(RMQConnection *)connection
disconnectedWithError:(NSError *)error{
if (error) {
NSLog(@"%@",error);
[self.conn close];
self.conn = nil;
}else{
NSLog(@"连接成功");
}
}
/// @brief Called before the configured <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> sleep.
- (void)willStartRecoveryWithConnection:(RMQConnection *)connection{
NSLog(@"222222");
}
/// @brief Called after the configured <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> sleep.
- (void)startingRecoveryWithConnection:(RMQConnection *)connection{
NSLog(@"333333");
}
/*!
* @brief Called when <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> has succeeded.
* @param connection the connection instance that was recovered.
*/
- (void)recoveredConnection:(RMQConnection *)connection{
NSLog(@"333333");
}
/// @brief Called with any channel-level AMQP exception.
- (void)channel:(id<RMQChannel>)channel
error:(NSError *)error{
if (error) {
NSLog(@"%@",error);
[self.conn close];
self.conn = nil;
}
}
如果使用:[RMQConnectionDelegateLogger new]当作代理,不需要实现代理的各种方法,系统自动打印各种状态的Log。我们只需要在监控端监控log日志就可以了。
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
(2)开始连接
[_conn start];// 开始连接, 主要是与AMQP服务器连接和握手。
/// @brief Connect and handshake with the remote AMQP server.
- (void)start;
(3)创建Channel(通道)
// Connection利用创建Channel,主要内容是开始一些操作和发送RMQConnectionDelegate给Channel。
id<RMQChannel> ch = [_conn createChannel];
/*!
* @brief Create a new channel, using an internally allocated channel number.
* @return An RMQAllocatedChannel or RMQUnallocatedChannel. The latter sends errors to the RMQConnectionDelegate.
*/
- (nonnull id<RMQChannel>)createChannel;
(3)创建Exchange(交换机)
// 这里我们采用的fanout方式。
// 注意交换机的名称(exchangeName)和操作类型(options)一定要和你们后台的格式保持一致。我这里采用的是永久类型的RMQExchangeDeclareDurable。
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
// options的类型
typedef NS_OPTIONS(NSUInteger, RMQExchangeDeclareOptions) {
RMQExchangeDeclareNoOptions = 0, // 没有操作类型
/// @brief If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
RMQExchangeDeclarePassive = 1 << 0,
/// @brief If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
RMQExchangeDeclareDurable = 1 << 1, // 持久类型
/// @brief If set, the exchange is deleted when all queues have finished using it.
RMQExchangeDeclareAutoDelete = 1 << 2, // 自动删除类型
/// @brief If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
RMQExchangeDeclareInternal = 1 << 3,
/// @brief
RMQExchangeDeclareNoWait = 1 << 4,
};
(4)创建Queue(队列)
// 注意队列的名称(queueName)不用填写。
// 因为服务端队列的声明是随机的不需要指定名称,由服务器默认生成名称。
// 操作类型(options)这个要按照你们后端要求去填写,我们使用的是RMQQueueDeclareExclusive和RMQQueueDeclareExclusive。
RMQQueue *q = [ch queue:@"" options: RMQQueueDeclareExclusive | RMQQueueDeclareAutoDelete];
(5)Bind(绑定)
// Bind是Queue和Exchange的绑定,实质是将队列名称与交换机名称进行绑定。
[q bind:x];
(6)Subscribe(订阅)
// Subscribe方法内部创建了Consumer消费者,利用消费者去订阅服务端发来消息。在handler的我们可以接收到从服务器端发来的消息。
[q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
四、集成错误
Error Domain=com.rabbitmq.rabbitmq-objc-client Code=406 "PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value" UserInfo={NSLocalizedDescription=PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value
错误原因:交换机的类型与后端交换机的类型(options)不一致。
Error Domain=com.rabbitmq.rabbitmq-objc-client Code=5 "Cannot use channel after it has been closed." UserInfo={NSLocalizedDescription=Cannot use channel after it has been closed.}
错误原因:由于我把交换机的类型与后端交换机的类型(options)不一致,所以服务器端拒绝连接,断开了channel。
五、集成注意
- 一定要在viewDidLoad方法中添加如下代码:
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];
#pragma mark - 系统的通知监听
- (void)activeNotification:(NSNotification *)notification{
// 当程序从后台切换到前台后,程序处于活跃状态,创建连接,开始接收消息。
if (_conn == nil) {
[self receiveLogs];
}
}
- (void)backgroundNotification:(NSNotification *)notification{
// 当程序从前台切换到后台,程序处于后台模式,关闭连接,停止接收消息。
// 这样做的目的是为了当程序处理非活跃状态,断开连接,有利于释放程序段和服务端资源,减少资源浪费,减少服务器压力,提高程序性能。
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
#pragma mark - Memory Manage
- (void)dealloc {
if (_conn) {
[self.conn close];
self.conn = nil;
}
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
六、总结。
- 集成前,要去了解
RabbitMQ
的概念、原理、应用等。 - 集成中,要参考官方文档和你与后台约定的集成的规则。
- 集成后,有时间和精力的话,可以尝试去阅读
RMQClient
框架的源代码。