(23)Route订阅模式(Dns部分)-【Lars-基于C++负载均衡远程服务器调度系统教程】

【Lars教程目录】

Lars源代码
https://github.com/aceld/Lars


【Lars系统概述】
第1章-概述
第2章-项目目录构建


【Lars系统之Reactor模型服务器框架模块】
第1章-项目结构与V0.1雏形
第2章-内存管理与Buffer封装
第3章-事件触发EventLoop
第4章-链接与消息封装
第5章-Client客户端模型
第6章-连接管理及限制
第7章-消息业务路由分发机制
第8章-链接创建/销毁Hook机制
第9章-消息任务队列与线程池
第10章-配置文件读写功能
第11章-udp服务与客户端
第12章-数据传输协议protocol buffer
第13章-QPS性能测试
第14章-异步消息任务机制
第15章-链接属性设置功能


【Lars系统之DNSService模块】
第1章-Lars-dns简介
第2章-数据库创建
第3章-项目目录结构及环境构建
第4章-Route结构的定义
第5章-获取Route信息
第6章-Route订阅模式
第7章-Backend Thread实时监控


【Lars系统之Report Service模块】
第1章-项目概述-数据表及proto3协议定义
第2章-获取report上报数据
第3章-存储线程池及消息队列


【Lars系统之LoadBalance Agent模块】
第1章-项目概述及构建
第2章-主模块业务结构搭建
第3章-Report与Dns Client设计与实现
第4章-负载均衡模块基础设计
第5章-负载均衡获取Host主机信息API
第6章-负载均衡上报Host主机信息API
第7章-过期窗口清理与过载超时(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-负载均衡获取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能测试工具
第12章- Lars启动工具脚本


6) Route订阅模式

6.1 订阅模块的设计与实现

​ 订阅模式整体的设计.

lars_dns/include/subscribe.h

#pragma once

#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"

using namespace __gnu_cxx;


//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;

class SubscribeList {
public:
    //设计单例
    static void init()  {
        _instance = new SubscribeList();
    }

    static SubscribeList *instance() {
        //保证init方法在这个进程执行中,只执行一次
        pthread_once(&_once, init);
        return _instance;
    }

    //订阅
    void subscribe(uint64_t mod, int fd);
    
    //取消订阅
    void unsubscribe(uint64_t mod, int fd);
    
    //发布
    void publish(std::vector<uint64_t> &change_mods);

    //根据在线用户fd得到需要发布的列表
    void make_publish_map(listen_fd_set &online_fds, 
                          publish_map &need_publish);
    
    
private:
    //设计单例
    SubscribeList();
    SubscribeList(const SubscribeList &);
    const SubscribeList& operator=(const SubscribeList);

    static SubscribeList *_instance;
    static pthread_once_t _once;


    subscribe_map _book_list; //订阅清单
    pthread_mutex_t _book_list_lock;

    publish_map _push_list; //发布清单
    pthread_mutex_t _push_list_lock;
};

​ 首先SubscribeList采用单例设计。这里面定义了两种数据类型

//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;

subscribe_map是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,其实一个fd就代表一个客户端。

publish_map是即将发布的表,其实这里面是subscribe_map的一个反表,key是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。

属性

_book_list:目前dns已经全部的订阅信息清单。

_push_list:目前dns即将发布的客户端及订阅信息清单。

方法

void subscribe(uint64_t mod, int fd): 加入modid/cmdid 和订阅的客户端fd到_book_list中。

void unsubscribe(uint64_t mod, int fd):取消一条订阅数据。

void publish(std::vector<uint64_t> &change_mods): 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。

void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish): 根据目前在线的订阅用户,得到需要通信的发布订阅列表。

具体实现如下:

lars_dns/src/subscribe.cpp

#include "subscribe.h"

extern tcp_server *server;

//单例对象
SubscribeList *SubscribeList::_instance = NULL;

//用于保证创建单例的init方法只执行一次的锁
pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;

SubscribeList::SubscribeList()
{
}

//订阅
void SubscribeList::subscribe(uint64_t mod, int fd)
{
    //将mod->fd的关系加入到_book_list中
    pthread_mutex_lock(&_book_list_lock);
    _book_list[mod].insert(fd);
    pthread_mutex_unlock(&_book_list_lock);
}

//取消订阅
void SubscribeList::unsubscribe(uint64_t mod, int fd)
{
    //将mod->fd关系从_book_list中删除
    pthread_mutex_lock(&_book_list_lock);
    if (_book_list.find(mod) != _book_list.end()) {
        _book_list[mod].erase(fd);
        if (_book_list[mod].empty() == true) {
            _book_list.erase(mod);
        }
    }
    pthread_mutex_unlock(&_book_list_lock);
}

void push_change_task(event_loop *loop, void *args)
{
    SubscribeList *subscribe = (SubscribeList*)args;

    //1 获取全部的在线客户端fd
    listen_fd_set online_fds;
    loop->get_listen_fds(online_fds);
    
    //2 从subscribe的_push_list中 找到与online_fds集合匹配,放在一个新的publish_map里
    publish_map need_publish;
    subscribe->make_publish_map(online_fds, need_publish);

    //3 依次从need_publish取出数据 发送给对应客户端链接
    publish_map::iterator it; 
    for (it = need_publish.begin(); it != need_publish.end(); it++) {
        int fd = it->first; //fd

        //遍历 fd对应的 modid/cmdid集合
        hash_set<uint64_t>::iterator st;
        for (st = it->second.begin(); st != it->second.end(); st++) {
            //一个modid/cmdid
            int modid = int((*st) >> 32);
            int cmdid = int(*st);

            //组装pb消息,发送给客户
            lars::GetRouteResponse rsp; 
            rsp.set_modid(modid);
            rsp.set_cmdid(cmdid);

            //通过route查询对应的host ip/port信息 进行组装
            host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
            for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
                uint64_t ip_port_pair = *hit;
                lars::HostInfo host_info;
                host_info.set_ip((uint32_t)(ip_port_pair >> 32));
                host_info.set_port((int)ip_port_pair);

                //添加到rsp中
                rsp.add_host()->CopyFrom(host_info);
            }

            //给当前fd 发送一个更新消息
            std::string responseString;
            rsp.SerializeToString(&responseString);

            //通过fd取出链接信息
            net_connection *conn = tcp_server::conns[fd];
            if (conn != NULL) {
                conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
            }
        }
    }
}

//根据在线用户fd得到需要发布的列表
void SubscribeList::make_publish_map(
            listen_fd_set &online_fds, 
            publish_map &need_publish)
{
    publish_map::iterator it;

    pthread_mutex_lock(&_push_list_lock);
    //遍历_push_list 找到 online_fds匹配的数据,放到need_publish中
    for (it = _push_list.begin(); it != _push_list.end(); it++)  {
        //it->first 是 fd
        //it->second 是 modid/cmdid
        if (online_fds.find(it->first) != online_fds.end()) {
            //匹配到
            //当前的键值对移动到need_publish中
            need_publish[it->first] = _push_list[it->first];
            //当该组数据从_push_list中删除掉
            _push_list.erase(it);
        }
    }

    pthread_mutex_unlock(&_push_list_lock);
}



//发布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
    //1 将change_mods已经修改的mod->fd 
    //  放到 发布清单_push_list中 
    pthread_mutex_lock(&_book_list_lock);
    pthread_mutex_lock(&_push_list_lock);

    std::vector<uint64_t>::iterator it;

    for (it = change_mods.begin(); it != change_mods.end(); it++) {
        uint64_t mod = *it;
        if (_book_list.find(mod) != _book_list.end()) {
            //将mod下面的fd set集合拷迁移到 _push_list中
            hash_set<int>::iterator fds_it;
            for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
                int fd = *fds_it;
                _push_list[fd].insert(mod);
            }
        }
    }

    pthread_mutex_unlock(&_push_list_lock);
    pthread_mutex_unlock(&_book_list_lock);

    //2 通知各个线程去执行推送任务
    server->thread_poll()->send_task(push_change_task, this);
}

​ 这里需要注意的是publish()里的server变量是全局变量,全局唯一的server句柄。

6.2 开启订阅

​ 那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.

lars_dns/src/dns_service.cpp

#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"

tcp_server *server;

using __gnu_cxx::hash_set;

typedef hash_set<uint64_t> client_sub_mod_list;

// ...

//订阅route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
    conn->param = new client_sub_mod_list;
}

//退订route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
    client_sub_mod_list::iterator it;
    client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;

    for (it = sub_list->begin(); it  != sub_list->end(); it++) {
        uint64_t mod = *it;
        SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
    }

    delete sub_list;

    conn->param = NULL;
}

int main(int argc, char **argv)
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("conf/lars_dns.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7778);


    //创建tcp服务器
    server = new tcp_server(&loop, ip.c_str(), port);

    //==========注册链接创建/销毁Hook函数============
    server->set_conn_start(create_subscribe);
    server->set_conn_close(clear_subscribe);
    //============================================

    //注册路由业务
    server->add_msg_router(lars::ID_GetRouteRequest, get_route);

    //开始事件监听    
    printf("lars dns service ....\n");
    loop.event_process();

    return 0;
}

​ 这里注册了两个链接Hook。create_subscribe()clear_subscribe()

client_sub_mod_list为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。


关于作者:

作者:Aceld(刘丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原创书籍gitbook: http://legacy.gitbook.com/@aceld

原创声明:未经作者允许请勿转载, 如果转载请注明出处

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容