RabbitMQ——第二篇:IOS版本RabbitMQ集成

一、背景###

由于我公司产品需求想实现行情实时刷新功能,大家都知道这时候NSTimer不能再满足产品需求,所以我们的服务端采用了RabbitMQ来实现行情的实时刷新。但是我在集成IOS端,发现网上几乎没有关于IOS端集成RabbitMQ示范文档,所以我想写一篇分享给大家如何在IOS端集成RabbitMQ,希望能给大家一点帮助。

二、集成

集成IOSRabbitMQ框架,我们采用RabbitMQ官方提供的RMQClient框架。集成前的具体准备我不作赘述,官方文档很详细,RMQClient加入工程之后,请参考RabbitMQ详细集成指南。这篇指南分为"Hello World!" Work queuesPublish/SubscribeRoutingTopics,详细的介绍了RabbitMQ概念应用各种模式集成等。

我们公司采用的是RabbitMQPublish/Subscribe消息模式,具体代码如下:

//MARK: - RMQClient
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()
@property (nonatomic,strong) RMQConnection *conn;
@end
@implementation ViewController
- (void)viewDidLoad {
    [super viewDidLoad];
    // Do any additional setup after loading the view, typically from a nib.

    [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
    
    [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];

}

#pragma mark - 系统的通知监听
- (void)activeNotification:(NSNotification *)notification{
    if (_conn == nil) {
        [self receiveLogs];
    }
}
- (void)backgroundNotification:(NSNotification *)notification{
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}

- (void)viewWillAppear:(BOOL)animated{
    [super viewWillAppear:animated];
    [self receiveRabbitServicerMessage];
}

- (void)viewWillDisappear:(BOOL)animated{
    [super viewWillDisappear:animated];
    
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}

- (void)receiveRabbitServicerMessage {
    
    NSString *url5 = @"amqp://servicerName:servicerPassword@host:port/Vhost";

    if (_conn == nil) {//[RMQConnectionDelegateLogger new]
        _conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
    }

    [_conn start];
    
    id<RMQChannel> ch = [_conn createChannel];
    
    RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
    
    RMQQueue *q = [ch queue:@"" options:RMQQueueDeclareExclusive |RMQQueueDeclareAutoDelete];
    
    [q bind:x];
    
    NSLog(@"Waiting for logs.");
    
    [q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
        NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
    }];
    
}

三、代码解释

(1)RMQConnection的声明

// RMQConnection 的声明采用如下方式:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
// 源代码释义:

/*!
 * @brief Parses URI to obtain credentials and TLS enablement (which implies verifyPeer).
 * @param uri        The URI contains all connection information, including credentials.<br/>
                     For example, "amqps://user:pass@hostname:1234/myvhost".<br/>
                     Note: to use the default "/" vhost, omit the trailing slash (or else you must encode it as %2F).
 * @param delegate   Any object that conforms to the RMQConnectionDelegate protocol.
                     Use this to handle connection- and  channel-level errors.
                     RMQConnectionDelegateLogger is useful for development purposes.
 */
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
                           delegate:(nullable id<RMQConnectionDelegate>)delegate;
参数uri的格式:"amqps://user:pass@hostname:1234/myvhost"
user:用户名;
pass:用户密码;
hostName:域名;
1234:端口码;
myvhost:vhost;

[参数更多参考格式](http://bit.ly/ks8MXK).
参数delegate:
// 如果使用RMQConnectionDelegate
// RMQConnection声明如下:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:self];

//  遵守代理:
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()<RMQConnectionDelegate>
@property (nonatomic,strong) RMQConnection *conn;
@end

// 实现代理:监控RabbitMQ日志
/// @brief Called when a socket cannot be opened, or when AMQP handshaking times out for some reason.
- (void)      connection:(RMQConnection *)connection
failedToConnectWithError:(NSError *)error{
    
    if (error) {
        NSLog(@"%@",error);
        NSLog(@"连接超时");
        [self.conn close];
        self.conn = nil;
    }else{
        
    }
}

/// @brief Called when a connection disconnects for any reason
- (void)   connection:(RMQConnection *)connection
disconnectedWithError:(NSError *)error{
    if (error) {
        NSLog(@"%@",error);
        [self.conn close];
        self.conn = nil;
    }else{
        NSLog(@"连接成功");
    }
}
/// @brief Called before the configured <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> sleep.
- (void)willStartRecoveryWithConnection:(RMQConnection *)connection{
    NSLog(@"222222");
}
/// @brief Called after the configured <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> sleep.
- (void)startingRecoveryWithConnection:(RMQConnection *)connection{
    NSLog(@"333333");
}
/*!
 * @brief Called when <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a> has succeeded.
 * @param connection the connection instance that was recovered.
 */
- (void)recoveredConnection:(RMQConnection *)connection{
    NSLog(@"333333");
}
/// @brief Called with any channel-level AMQP exception.
- (void)channel:(id<RMQChannel>)channel
          error:(NSError *)error{
    if (error) {
        NSLog(@"%@",error);
        [self.conn close];
        self.conn = nil;
    }
}


如果使用:[RMQConnectionDelegateLogger new]当作代理,不需要实现代理的各种方法,系统自动打印各种状态的Log。我们只需要在监控端监控log日志就可以了。
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];

(2)开始连接

[_conn start];// 开始连接, 主要是与AMQP服务器连接和握手。
/// @brief Connect and handshake with the remote AMQP server.
- (void)start;

(3)创建Channel(通道)

// Connection利用创建Channel,主要内容是开始一些操作和发送RMQConnectionDelegate给Channel。
 id<RMQChannel> ch = [_conn createChannel];

/*!
 * @brief Create a new channel, using an internally allocated channel number.
 * @return An RMQAllocatedChannel or RMQUnallocatedChannel. The latter sends errors to the RMQConnectionDelegate.
 */
- (nonnull id<RMQChannel>)createChannel;

(3)创建Exchange(交换机)

// 这里我们采用的fanout方式。
// 注意交换机的名称(exchangeName)和操作类型(options)一定要和你们后台的格式保持一致。我这里采用的是永久类型的RMQExchangeDeclareDurable。
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
// options的类型
typedef NS_OPTIONS(NSUInteger, RMQExchangeDeclareOptions) {
    RMQExchangeDeclareNoOptions  = 0, // 没有操作类型
    /// @brief If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
    RMQExchangeDeclarePassive    = 1 << 0,
    /// @brief If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
    RMQExchangeDeclareDurable    = 1 << 1,  // 持久类型
    /// @brief If set, the exchange is deleted when all queues have finished using it.
    RMQExchangeDeclareAutoDelete = 1 << 2, // 自动删除类型
    /// @brief If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
    RMQExchangeDeclareInternal   = 1 << 3,
    /// @brief
    RMQExchangeDeclareNoWait     = 1 << 4,
};

(4)创建Queue(队列)

// 注意队列的名称(queueName)不用填写。
// 因为服务端队列的声明是随机的不需要指定名称,由服务器默认生成名称。
// 操作类型(options)这个要按照你们后端要求去填写,我们使用的是RMQQueueDeclareExclusive和RMQQueueDeclareExclusive。
 RMQQueue *q = [ch queue:@"" options: RMQQueueDeclareExclusive | RMQQueueDeclareAutoDelete];

(5)Bind(绑定)

// Bind是Queue和Exchange的绑定,实质是将队列名称与交换机名称进行绑定。
[q bind:x];

(6)Subscribe(订阅)

// Subscribe方法内部创建了Consumer消费者,利用消费者去订阅服务端发来消息。在handler的我们可以接收到从服务器端发来的消息。
 [q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
        NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);   
}];

四、集成错误

Error Domain=com.rabbitmq.rabbitmq-objc-client Code=406 "PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value" UserInfo={NSLocalizedDescription=PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value

错误原因:交换机的类型与后端交换机的类型(options)不一致。

Error Domain=com.rabbitmq.rabbitmq-objc-client Code=5 "Cannot use channel after it has been closed." UserInfo={NSLocalizedDescription=Cannot use channel after it has been closed.}

错误原因:由于我把交换机的类型与后端交换机的类型(options)不一致,所以服务器端拒绝连接,断开了channel。

五、集成注意

  • 一定要在viewDidLoad方法中添加如下代码:
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];

#pragma mark - 系统的通知监听
- (void)activeNotification:(NSNotification *)notification{
// 当程序从后台切换到前台后,程序处于活跃状态,创建连接,开始接收消息。
    if (_conn == nil) {
        [self receiveLogs];
    }
}
- (void)backgroundNotification:(NSNotification *)notification{
// 当程序从前台切换到后台,程序处于后台模式,关闭连接,停止接收消息。
// 这样做的目的是为了当程序处理非活跃状态,断开连接,有利于释放程序段和服务端资源,减少资源浪费,减少服务器压力,提高程序性能。
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}
#pragma mark - Memory Manage
- (void)dealloc {

  if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
    [[NSNotificationCenter  defaultCenter] removeObserver:self];
    
}

六、总结。

  • 集成前,要去了解RabbitMQ的概念、原理、应用等。
  • 集成中,要参考官方文档和你与后台约定的集成的规则。
  • 集成后,有时间和精力的话,可以尝试去阅读RMQClient框架的源代码。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,983评论 3 41
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51