pomelo-rpc原理解析之server

原文pomelo-rpc原理解析之server

pomelo-rpc是pomelo项目底层的rpc框架,提供了一个多服务器进程间进行rpc调用的基础设施。 pomelo-rpc分为客户端和服务器端两个部分。 客户端部分提供了rpc代理生成,消息路由和网络通讯等功能,并支持动态添加代理和远程服务器配置。 服务器端提供了远程服务暴露,请求派发,网络通讯等功能。

本文主要分析pomelo-rpc中server部分的实现原理以及运行逻辑。

server初始化

Server.create(opts)

创建一个rpc server实例。根据配置信息加载远程服务代码,并生成底层acceptor。
首先看create部分源码:

module.exports.create = function(opts) {
  if(!opts || !opts.port || opts.port < 0 || !opts.paths) {
    throw new Error('opts.port or opts.paths invalid.');
  }
  // 根据paths加载远程服务
  var services = loadRemoteServices(opts.paths, opts.context);
  opts.services = services;
  var gateway = Gateway.create(opts);
  return gateway;
};

首先loadRemoteServices()方法根据opts参数中的paths加载远程服务。
其中pomelo paths的格式类似,pomelo根据约定封装,详见
https://github.com/NetEase/pomelo/blob/master/lib/components/remote.js

[
  {
    "namespace": "user",
    "serverType": "test",
    "path": "/data/pomelo/app/servers/test/remote/"
  },
  {
    "namespace": "sys",
    "serverType": "test",
    "path": "/data/pomelo/pomelo/lib/common/remote/backend/"
  }
]
var loadRemoteServices = function(paths, context) {
  var res = {}, item, m;
  for(var i=0, l=paths.length; i<l; i++) {
    item = paths[i];
    // Loader是pomelo-loader,用来加载pomelo handler和remote服务
    // 关于Loader的细节可以参考https://github.com/NetEase/pomelo-loader/
    m = Loader.load(item.path, context);

    if(m) {
      createNamespace(item.namespace, res);
      for(var s in m) {
        res[item.namespace][s] = m[s];
      }
    }
  }

  return res;
};

var createNamespace = function(namespace, proxies) {
  proxies[namespace] = proxies[namespace] || {};
};

loadRemoteServices()最终得到的services对象类似如下结构的数据:

{
  "user": {
    "testRemote": require("/path/to/testRemote.js"),
    "test2Remote": require("/path/to/test2Remote.js"),
    ...
  },
  "sys": {
    "msgRemote": require("/data/pomelo/pomelo/lib/common/remote/backend/msgRemote.js")
  }
}

Gateway.create(opts)

创建gateway对象,并初始化dispatcher和acceptor。
gateway的构造方法:

var Gateway = function(opts) {
  EventEmitter.call(this);
  this.opts = opts || {};
  this.port = opts.port || 3050;
  this.started = false;
  this.stoped = false;
  this.services = opts.services;
  if(!!this.opts.reloadRemotes) {
    // 如果remote配置reloadRemotes=true的话,gateway会通过`fs.watch()`来检测remote文件的变化,
    // 然后通过pomelo-loader重新require对应的remote文件来实现remote的热更新。
    watchServices(this, dispatcher);
  }
  var self = this;

  this.acceptors = {};
  // __defineGetter__的用法可以参考
  // https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Object/__defineGetter__
  this.acceptors.__defineGetter__('tcp', utils.load.bind(null, '../rpc-server/acceptors/tcp-acceptor'));
  this.acceptors.__defineGetter__('ws', utils.load.bind(null,'../rpc-server/acceptors/ws-acceptor'));

  if(!!opts.acceptorName && opts.acceptorName === 'ws') {
    this.acceptorFactory = this.acceptors.ws;
  } else {
    // 默认是使用tcp协议的acceptors
    this.acceptorFactory = this.acceptors.tcp;
  }

  if(!!opts.acceptorFactory) {
    this.acceptorFactory = opts.acceptorFactory;
  }
  // Dispatcher初始化,没什么好看的
  var dispatcher = new Dispatcher(this.services);
  // acceptorFactory.create初始化acceptor,也没什么好看的
  this.acceptor = this.acceptorFactory.create(opts, function(tracer, msg, cb) {
    dispatcher.route(tracer, msg, cb);
  });
};

Dispatcher.route(tracer, msg, cb)

提供路由服务,路由消息到对应的service。

/**
 * route the msg to appropriate service object
 *
 * @param msg msg package {service:serviceString, method:methodString, args:[]}
 * @param services services object collection, such as {service1: serviceObj1, service2: serviceObj2}
 * @param cb(...) callback function that should be invoked as soon as the rpc finished
 */
pro.route = function(tracer, msg, cb) {
  tracer.info('server', __filename, 'route', 'route messsage to appropriate service object');
  // this.services 就是loadRemoteServices()加载的本地remote service
  var namespace = this.services[msg.namespace];
  if(!namespace) {
    tracer.error('server', __filename, 'route', 'no such namespace:' + msg.namespace);
    utils.invokeCallback(cb, new Error('no such namespace:' + msg.namespace));
    return;
  }

  var service = namespace[msg.service];
  if(!service) {
    tracer.error('server', __filename, 'route', 'no such service:' + msg.service);
    utils.invokeCallback(cb, new Error('no such service:' + msg.service));
    return;
  }

  var method = service[msg.method];
  if(!method) {
    tracer.error('server', __filename, 'route', 'no such method:' + msg.method);
    utils.invokeCallback(cb, new Error('no such method:' + msg.method));
    return;
  }

  var args = msg.args.slice(0);
  args.push(cb);
  // 调用remote service方法
  method.apply(service, args);
};

至此,pomelo-rpc的server初始化工作就完成了,接下来就是启动server。

server启动

Gateway.start()

pro.start = function() {
  if(this.started) {
    throw new Error('gateway already start.');
  }
  this.started = true;

  var self = this;
  this.acceptor.on('error', self.emit.bind(self, 'error'));
  this.acceptor.on('closed', self.emit.bind(self, 'closed'));
  // 启动acceptor并监听port端口
  this.acceptor.listen(this.port);
};

Acceptor.listen(port)

启动acceptor并监听port端口,这里以tcp acceptor为例。

pro.listen = function(port) {
  //check status
  if(!!this.inited) {
    utils.invokeCallback(this.cb, new Error('already inited.'));
    return;
  }
  this.inited = true;

  var self = this;

  this.server = net.createServer();
  this.server.listen(port);

  this.server.on('error', function(err) {
    logger.error('rpc server is error: %j', err.stack);
    self.emit('error', err, this);
  });
  // 处理链接请求
  this.server.on('connection', function(socket) {
    // 设置socket自增id
    socket.id = self.socketId++;
    // 保存socket句柄到本地sockets
    self.sockets[socket.id] = socket;
    // 设置socket的消息编解码处理器,处理各种类型的消息(ping,pong,msg等)
    // pkgSize默认-1,不限制消息长度
    socket.composer = new Composer({maxLength: self.pkgSize});
    // 心跳超时检测timer
    self.timer[socket.id] = null;
    // 启动心跳检测,如果heartbeat timeout间隔内没有心跳包过来,就断开连接。
    // 下面socket.composer.on('data' ...中的self.heartbeat才是真正连接后的心跳
    // 相当于这里只是处理初始连接时的心跳
    self.heartbeat(socket.id);

    socket.on('data', function(data) {
      // 调用composer解析数据流
      // feed读完数据后会emit data事件
      socket.composer.feed(data);
    });
    // 接收feed emit的data事件
    socket.composer.on('data', function(data) {
      self.heartbeat(socket.id);
      if(data[0] === PING) {
        //incoming::ping,response with PONG
        socket.write(socket.composer.compose(PONG));
      } else {
        try {
          var pkg = JSON.parse(data.toString('utf-8', 1));
          var id  = null;
          // 处理消息
          if(pkg instanceof Array) {
            processMsgs(socket, self, pkg, id);
          } else {
            processMsg(socket, self, pkg, id);
          }
        } catch(err) { //json parse exception
          if(err) {
            // 重置编解器状态
            socket.composer.reset();
            logger.error(err);
          }
        }
      }
    });

    socket.on('error', function(err) {
      logger.error('[pomelo-rpc] tcp socket error: %j', err);
    });

    socket.on('close', function() {
      logger.error('[pomelo-rpc] tcp socket close: %s', socket.id);
      delete self.sockets[socket.id];
      delete self.msgQueues[socket.id];
      if(self.timer[socket.id]){
        clearTimeout(self.timer[socket.id]);
      }
      delete self.timer[socket.id];
    });
  });
  // 定时flush缓存的消息数据
  if(this.bufferMsg) {
    this._interval = setInterval(function() {
      flush(self);
    }, this.interval);
  }
};

processMsg()

处理rpc消息。实际就是调用dispatcher.route()来根据namespace和service来调用对应的remote method。

var processMsg = function(socket, acceptor, pkg, id) {
  var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
  tracer.info('server', __filename, 'processMsg', 'tcp-acceptor receive message and try to process message');
  // 实际就是调用dispatcher.route()
  acceptor.cb.call(null, tracer, pkg.msg, function() {
    var args = Array.prototype.slice.call(arguments, 0);
    for(var i=0, l=args.length; i<l; i++) {
      if(args[i] instanceof Error) {
        args[i] = cloneError(args[i]);
      }
    }
    var resp;
    if(tracer.isEnabled) {
      resp = {traceId: tracer.id, seqId: tracer.seq, source: tracer.source, id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
    }
    else {
      resp = {id: pkg.id, resp: Array.prototype.slice.call(args, 0)};
    }
    if(acceptor.bufferMsg) {
      // 如果开启缓冲消息则将结果缓存到队列,由_interval定时flush,用以减少网络io次数
      enqueue(socket, acceptor, resp);
    } else {
      // 发送封装remote方法的返回结果
      socket.write(socket.composer.compose(RES_TYPE, JSON.stringify(resp), id));
    }
  });
};

总结

通过对pomelo-rpc server部分代码的分析,可以很清晰了解到server端主要作用就是暴露远程服务(remote目录下的.js文件)、根据消息的namespace和service信息派发到对应的remote服务处理、基于tcp/ws来提供底层的网络通讯。

在日常开发中,新手很容易遇到rpc调用超时的情况,一般来看都是因为某些remote方法没有正确回调或者根本漏写了回调。

相关文章:pomelo-rpc原理解析之client

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容