linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能

转载自:http://blog.csdn.net/chenzba/article/details/51224715

最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。

废话不多说,先贴代码。

redis_publisher.h

[cpp]view plaincopy

/*************************************************************************

> File Name: redis_publisher.h

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 10:15:09 PM CST

> Description: 封装hiredis,实现消息发布给redis功能

************************************************************************/

#ifndef REDIS_PUBLISHER_H

#define REDIS_PUBLISHER_H

#include 

#include 

#include 

#include 

#include 

#include 

#include 

#include 

#include 

classCRedisPublisher

{

public:

CRedisPublisher();

~CRedisPublisher();

boolinit();

booluninit();

boolconnect();

booldisconnect();

boolpublish(conststd::string &channel_name,

conststd::string &message);

private:

// 下面三个回调函数供redis服务调用

// 连接回调

staticvoidconnect_callback(constredisAsyncContext *redis_context,

intstatus);

// 断开连接的回调

staticvoiddisconnect_callback(constredisAsyncContext *redis_context,

intstatus);

// 执行命令回调

staticvoidcommand_callback(redisAsyncContext *redis_context,

void*reply,void*privdata);

// 事件分发线程函数

staticvoid*event_thread(void*data);

void*event_proc();

private:

// libevent事件对象

event_base *_event_base;

// 事件线程ID

pthread_t _event_thread;

// 事件线程的信号量

sem_t _event_sem;

// hiredis异步对象

redisAsyncContext *_redis_context;

};

#endif

redis_publisher.cpp

[cpp]view plaincopy

/*************************************************************************

> File Name: redis_publisher.cpp

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 10:15:09 PM CST

> Description:

************************************************************************/

#include 

#include 

#include 

#include "redis_publisher.h"

CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),

_redis_context(0)

{

}

CRedisPublisher::~CRedisPublisher()

{

}

boolCRedisPublisher::init()

{

// initialize the event

_event_base = event_base_new();// 创建libevent对象

if(NULL == _event_base)

{

printf(": Create redis event failed.\n");

returnfalse;

}

memset(&_event_sem, 0,sizeof(_event_sem));

intret = sem_init(&_event_sem, 0, 0);

if(ret != 0)

{

printf(": Init sem failed.\n");

returnfalse;

}

returntrue;

}

boolCRedisPublisher::uninit()

{

_event_base = NULL;

sem_destroy(&_event_sem);

returntrue;

}

boolCRedisPublisher::connect()

{

// connect redis

_redis_context = redisAsyncConnect("127.0.0.1", 6379);// 异步连接到redis服务器上,使用默认端口

if(NULL == _redis_context)

{

printf(": Connect redis failed.\n");

returnfalse;

}

if(_redis_context->err)

{

printf(": Connect redis error: %d, %s\n",

_redis_context->err, _redis_context->errstr);// 输出错误信息

returnfalse;

}

// attach the event

redisLibeventAttach(_redis_context, _event_base);// 将事件绑定到redis context上,使设置给redis的回调跟事件关联

// 创建事件处理线程

intret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread,this);

if(ret != 0)

{

printf(": create event thread failed.\n");

disconnect();

returnfalse;

}

// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

redisAsyncSetConnectCallback(_redis_context,

&CRedisPublisher::connect_callback);

// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisPublisher::disconnect_callback);

// 启动事件线程

sem_post(&_event_sem);

returntrue;

}

boolCRedisPublisher::disconnect()

{

if(_redis_context)

{

redisAsyncDisconnect(_redis_context);

redisAsyncFree(_redis_context);

_redis_context = NULL;

}

returntrue;

}

boolCRedisPublisher::publish(conststd::string &channel_name,

conststd::string &message)

{

intret = redisAsyncCommand(_redis_context,

&CRedisPublisher::command_callback,this,"PUBLISH %s %s",

channel_name.c_str(), message.c_str());

if(REDIS_ERR == ret)

{

printf("Publish command failed: %d\n", ret);

returnfalse;

}

returntrue;

}

voidCRedisPublisher::connect_callback(constredisAsyncContext *redis_context,

intstatus)

{

if(status != REDIS_OK)

{

printf(": Error: %s\n", redis_context->errstr);

}

else

{

printf(": Redis connected!\n");

}

}

voidCRedisPublisher::disconnect_callback(

constredisAsyncContext *redis_context,intstatus)

{

if(status != REDIS_OK)

{

// 这里异常退出,可以尝试重连

printf(": Error: %s\n", redis_context->errstr);

}

}

// 消息接收回调函数

voidCRedisPublisher::command_callback(redisAsyncContext *redis_context,

void*reply,void*privdata)

{

printf("command callback.\n");

// 这里不执行任何操作

}

void*CRedisPublisher::event_thread(void*data)

{

if(NULL == data)

{

printf(": Error!\n");

assert(false);

returnNULL;

}

CRedisPublisher *self_this =reinterpret_cast(data);

returnself_this->event_proc();

}

void*CRedisPublisher::event_proc()

{

sem_wait(&_event_sem);

// 开启事件分发,event_base_dispatch会阻塞

event_base_dispatch(_event_base);

returnNULL;

}

redis_subscriber.h

[cpp]view plaincopy

/*************************************************************************

> File Name: redis_subscriber.h

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 10:15:09 PM CST

> Description: 封装hiredis,实现消息订阅redis功能

************************************************************************/

#ifndef REDIS_SUBSCRIBER_H

#define REDIS_SUBSCRIBER_H

#include 

#include 

#include 

#include 

#include 

#include 

#include 

#include 

#include 

classCRedisSubscriber

{

public:

typedef  std::tr1::function <void(const char*,const char*,int)> NotifyMessageFn;// 回调函数对象类型,当接收到消息后调用回调把消息发送出去

CRedisSubscriber();

~CRedisSubscriber();

boolinit(const NotifyMessageFn &fn);// 传入回调对象

bool uninit();

bool connect();

bool disconnect();

// 可以多次调用,订阅多个频道

bool subscribe(conststd::string &channel_name);

private:

// 下面三个回调函数供redis服务调用

// 连接回调

static void connect_callback(constredisAsyncContext *redis_context,

intstatus);

// 断开连接的回调

staticvoiddisconnect_callback(constredisAsyncContext *redis_context,

intstatus);

// 执行命令回调

staticvoidcommand_callback(redisAsyncContext *redis_context,

void*reply,void*privdata);

// 事件分发线程函数

staticvoid*event_thread(void*data);

void*event_proc();

private:

// libevent事件对象

event_base *_event_base;

// 事件线程ID

pthread_t _event_thread;

// 事件线程的信号量

sem_t _event_sem;

// hiredis异步对象

redisAsyncContext *_redis_context;

// 通知外层的回调函数对象

NotifyMessageFn _notify_message_fn;

};

#endif

redis_subscriber.cpp:

[cpp]view plaincopy

/*************************************************************************

> File Name: redis_subscriber.cpp

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 10:15:09 PM CST

> Description:

************************************************************************/

#include 

#include 

#include 

#include "redis_subscriber.h"

CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),

_redis_context(0)

{

}

CRedisSubscriber::~CRedisSubscriber()

{

}

boolCRedisSubscriber::init(constNotifyMessageFn &fn)

{

// initialize the event

_notify_message_fn = fn;

_event_base = event_base_new();// 创建libevent对象

if(NULL == _event_base)

{

printf(": Create redis event failed.\n");

returnfalse;

}

memset(&_event_sem, 0,sizeof(_event_sem));

intret = sem_init(&_event_sem, 0, 0);

if(ret != 0)

{

printf(": Init sem failed.\n");

returnfalse;

}

returntrue;

}

boolCRedisSubscriber::uninit()

{

_event_base = NULL;

sem_destroy(&_event_sem);

returntrue;

}

boolCRedisSubscriber::connect()

{

// connect redis

_redis_context = redisAsyncConnect("127.0.0.1", 6379);// 异步连接到redis服务器上,使用默认端口

if(NULL == _redis_context)

{

printf(": Connect redis failed.\n");

returnfalse;

}

if(_redis_context->err)

{

printf(": Connect redis error: %d, %s\n",

_redis_context->err, _redis_context->errstr);// 输出错误信息

returnfalse;

}

// attach the event

redisLibeventAttach(_redis_context, _event_base);// 将事件绑定到redis context上,使设置给redis的回调跟事件关联

// 创建事件处理线程

intret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread,this);

if(ret != 0)

{

printf(": create event thread failed.\n");

disconnect();

returnfalse;

}

// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

redisAsyncSetConnectCallback(_redis_context,

&CRedisSubscriber::connect_callback);

// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisSubscriber::disconnect_callback);

// 启动事件线程

sem_post(&_event_sem);

returntrue;

}

boolCRedisSubscriber::disconnect()

{

if(_redis_context)

{

redisAsyncDisconnect(_redis_context);

redisAsyncFree(_redis_context);

_redis_context = NULL;

}

returntrue;

}

boolCRedisSubscriber::subscribe(conststd::string &channel_name)

{

intret = redisAsyncCommand(_redis_context,

&CRedisSubscriber::command_callback,this,"SUBSCRIBE %s",

channel_name.c_str());

if(REDIS_ERR == ret)

{

printf("Subscribe command failed: %d\n", ret);

returnfalse;

}

printf(": Subscribe success: %s\n", channel_name.c_str());

returntrue;

}

voidCRedisSubscriber::connect_callback(constredisAsyncContext *redis_context,

intstatus)

{

if(status != REDIS_OK)

{

printf(": Error: %s\n", redis_context->errstr);

}

else

{

printf(": Redis connected!");

}

}

voidCRedisSubscriber::disconnect_callback(

constredisAsyncContext *redis_context,intstatus)

{

if(status != REDIS_OK)

{

// 这里异常退出,可以尝试重连

printf(": Error: %s\n", redis_context->errstr);

}

}

// 消息接收回调函数

voidCRedisSubscriber::command_callback(redisAsyncContext *redis_context,

void*reply,void*privdata)

{

if(NULL == reply || NULL == privdata) {

return;

}

// 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问

CRedisSubscriber *self_this =reinterpret_cast(privdata);

redisReply *redis_reply =reinterpret_cast(reply);

// 订阅接收到的消息是一个带三元素的数组

if(redis_reply->type == REDIS_REPLY_ARRAY &&

redis_reply->elements == 3)

{

printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",

redis_reply->element[0]->str, redis_reply->element[0]->len,

redis_reply->element[1]->str, redis_reply->element[1]->len,

redis_reply->element[2]->str, redis_reply->element[2]->len);

// 调用函数对象把消息通知给外层

self_this->_notify_message_fn(redis_reply->element[1]->str,

redis_reply->element[2]->str, redis_reply->element[2]->len);

}

}

void*CRedisSubscriber::event_thread(void*data)

{

if(NULL == data)

{

printf(": Error!\n");

assert(false);

returnNULL;

}

CRedisSubscriber *self_this =reinterpret_cast(data);

returnself_this->event_proc();

}

void*CRedisSubscriber::event_proc()

{

sem_wait(&_event_sem);

// 开启事件分发,event_base_dispatch会阻塞

event_base_dispatch(_event_base);

returnNULL;

}

问题1:hiredis官网没有异步接口的实现例子。

hiredis提供了几个异步通信的API,一开始根据API名字的理解,我们实现了跟redis服务器建立连接、订阅和发布的功能,可在实际使用的时候,程序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。

网上查了很多资料,原来hiredis的异步实现是通过事件来分发redis发送过来的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一个实现事件的分发,网上的资料提示使用libae、libev和libuv可能发生其他问题,这里为了方便就选用libevent。hireds官网并没有对libevent做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。

关于libevent的使用这里就不再赘述,详情可以见libevent官网。

libevent官网:http://libevent.org/

libevent api文档:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae

CRedisPublisher和CRedisSubscriber的初始化过程:

初始化事件处理,并获得事件处理的实例:

[cpp]view plaincopy

_event_base = event_base_new();

在获得redisAsyncContext *之后,调用

[cpp]view plaincopy

redisLibeventAttach(_redis_context, _event_base);

这样就将事件处理和redis关联起来,最后在另一个线程调用

[cpp]view plaincopy

event_base_dispatch(_event_base);

启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发,值得注意的是,这里用信号灯_event_sem控制线程的启动,意在程序调用

[cpp]view plaincopy

redisAsyncSetConnectCallback(_redis_context,

&CRedisSubscriber::connect_callback);

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisSubscriber::disconnect_callback);

之后,能够完全捕捉到这两个回调。

问题2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’错误

有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher和CRedisSubscriber很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够发布消息也能够订阅消息。其实一开始我就是这么干的,在使用的时候发现,用同个redisAsynContex *对象进行消息订阅和发布,与redis服务连接会自动断开,disconnect_callback回调会被调用,并且返回奇怪的错误:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同个redisAsyncContext *对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。

当然,你也可以将相同的逻辑抽象到一个基类里,并实现publish和subscribe接口。

问题3 相关依赖的库

编译之前,需要安装hiredis、libevent和boost库,我是用的是Ubuntu x64系统。

hiredis官网:https://github.com/redis/hiredis

下载源码解压,进入解压目录,执行make && make install命令。

libevent官网:http://libevent.org/下载最新的稳定版

解压后进入解压目录,执行命令

./configure -prefix=/usr

sudo make && make install

boost库:直接执行安装:sudo apt-get install libboost-dev

如果你不是用std::tr1::function的函数对象来给外层通知消息,就不需要boost库。你可以用接口的形式实现回调,把接口传给CRedisSubscribe类,让它在接收到消息后调用接口回调,通知外层。

问题4 如何使用

最后贴出例子代码。

publisher.cpp,实现发布消息:

[cpp]view plaincopy

/*************************************************************************

> File Name: publisher.cpp

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 12:13:24 PM CST

************************************************************************/

#include "redis_publisher.h"

intmain(intargc,char*argv[])

{

CRedisPublisher publisher;

boolret = publisher.init();

if(!ret)

{

printf("Init failed.\n");

return0;

}

ret = publisher.connect();

if(!ret)

{

printf("connect failed.");

return0;

}

while(true)

{

publisher.publish("test-channel","Test message");

sleep(1);

}

publisher.disconnect();

publisher.uninit();

return0;

}

subscriber.cpp实现订阅消息:

[cpp]view plaincopy

/*************************************************************************

> File Name: subscriber.cpp

> Author: chenzengba

> Mail: chenzengba@gmail.com

> Created Time: Sat 23 Apr 2016 12:26:42 PM CST

************************************************************************/

#include "redis_subscriber.h"

voidrecieve_message(constchar*channel_name,

constchar*message,intlen)

{

printf("Recieve message:\n    channel name: %s\n    message: %s\n",

channel_name, message);

}

intmain(intargc,char*argv[])

{

CRedisSubscriber subscriber;

CRedisSubscriber::NotifyMessageFn fn =

bind(recieve_message, std::tr1::placeholders::_1,

std::tr1::placeholders::_2, std::tr1::placeholders::_3);

boolret = subscriber.init(fn);

if(!ret)

{

printf("Init failed.\n");

return0;

}

ret = subscriber.connect();

if(!ret)

{

printf("Connect failed.\n");

return0;

}

subscriber.subscribe("test-channel");

while(true)

{

sleep(1);

}

subscriber.disconnect();

subscriber.uninit();

return0;

}

关于编译的问题:在g++中编译,注意要加上-lhiredis -levent参数,下面是一个简单的Makefile:

[cpp]view plaincopy

EXE=server_main client_main

CC=g++

FLAG=-lhiredis -levent

OBJ=redis_publisher.o publisher.o redis_subscriber.o subscriber.o

all:$(EXE)

$(EXE):$(OBJ)

$(CC) -o publisher redis_publisher.o publisher.o $(FLAG)

$(CC) -o subscriber redis_subscriber.o subscriber.o $(FLAG)

redis_publisher.o:redis_publisher.h

redis_subscriber.o:redis_subscriber.h

publisher.o:publisher.cpp

$(CC) -c publisher.cpp

subscriber.o:subscriber.cpp

$(CC) -c subscriber.cpp

clean:

rm publisher subscriber *.o

致谢:

redis异步API使用libevent:http://www.tuicool.com/articles/N73uuu

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容