背景
Egg.js原理简介
稍微熟悉Egg.js原理的应该都知道master / agent / worker
这三个进程的职责以及agent.js / app.js
这两个js文件,agent进程对应于agent.js,worker进程对应的是app.js,而worker进程是有多个的以集群方式进行工作的,并且最终部署的应用也是集群的方式部署在不同的机器上的,因此实际的worker是一个n x
m的数量。
服务长链
服务端应用最典型的就是数据库连接(如: MySQL),尤其是微服务化后出现了各种各样的中间件(如:Eureka/Zookeeper/Disconf), 这样每一个应用都需要维护各种各样的长链接。
Egg的支持
对于长链接的创建方式,Egg提供了两种支持分别是:app.addSingleton(name, creator)
和 多进程增强模型
。两种方式分别在什么时候使用? addSingleton
的方式可以直接参考Egg提供的例子MySQL,它可以保证一个application
的对象(一个worker)只会有一个mysql实例,但是多个worker还是会有多个,对于MySQL这种在server端有链接池的是没有问题的,而且这样实现也简单易用,但是如果没有链接池的中间件来讲这样是一种极大的资源浪费 (n x
m), 因此就会用到了多进程增强模型
,下面具体说说。
多进程增强模型
Egg中的多进程增强模型实际上完全使用的就是Cluser-Client库(也是阿里开源),在GitHub上面有它工作原理和使用方式的介绍,只是不知道大家会不会和我一样看了一遍之后依然不知所云和无从下手的感觉,因此才写了这篇博客将源码阅读的理解记录下来。
Egg文档引用
首先创建RegistryClient
代码如下:
const Base = require('sdk-base');
class RegistryClient extends Base {
...
}
然后创建一个APIClient
类继承框架提供的快捷类APIClientBase
, 代码如下:
const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase {
// 返回原始的客户端类
get DataClient() {
return RegistryClient;
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
}
这里需要注意的是:
-
DataClient
方法需要返回前面定义好的RegistryClient
类。 -
_client
属性是继承自父类, 直接就可以使用。
在Egg中嵌入上面的代码:
// app.js || agent.js
const APIClient = require('./APIClient'); // 上面那个模块
module.exports = app => {
const config = app.config.apiClient;
app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
app.beforeStart(async () => {
await app.apiClient.ready();
});
};
根据上面的代码进行下面的梳理:
- 每一个
agent / application
都会有一个APIClient
实例。 - 所有的
APIClient
实例都会知晓RegistryClient
类名。 -
APIClient
里面的方法会实际的调用一个_client
属性。
理解:
这就是一个静态代理模式,所有想要对RegistryClient
类的调用都要经过APIClient
进行一次代理,所以只要保证RegistryClient
的实例只有一个,其它所有的APIClient
都可以通过某种方式将操作(请求)传达给RegistryClient
就可以实现多进程单实例模式了。
注:上面使用方式是cluster-client的最佳实践,虽然抛开APIClient这个类也可以,这里直接跳过了是因为这样拆解更灵活并易于扩展,实际这里是需要进行两层的代理, RegistryClient会代理真正的业务client的调用(可以动态代理实现)并维护业务client的链接和事件接收,APIClient是用来mock所有业务client的api,让业务的使用更贴近真正业务client的调用。如(示意):
APIClient.getData() --> RegistryClient.<DynamicDispatcher> --> zkClient.getData()
源码分析
有了上面的例子和思路,带着下面两个问题进行源码的分析:
- 如何保证
RegistryClient
的实例只有一个。 -
APIClient
类是如何和真实的client类进行交互的。
主从模式(Leader / Follower)
将多进程分为主(Leader)进程和从(Follower)进程,Leader只有一个并负责维护实际的第三方应用的链接及事件处理,Follower用于订阅Leader的一些事件及主动推送数据给Leader,也可以主动调用Leader执行一些操作,它们之间可以通过进程间通信的方式进行信息交换。在Egg中规定了agent进程是Leader,而其他worker进程作为Follower,代码如下isLeader: this.type === 'agent'
:
// node_modules/egg/egg.js
class EggApplication extends EggCore {
constructor(options) {
...
...
/**
* Wrap the Client with Leader/Follower Pattern
*
* @description almost the same as Agent.cluster API, the only different is that this method create Follower.
*
* @see https://github.com/node-modules/cluster-client
* @param {Function} clientClass - client class function
* @param {Object} [options]
* - {Boolean} [autoGenerate] - whether generate delegate rule automatically, default is true
* - {Function} [formatKey] - a method to tranform the subscription info into a string,default is JSON.stringify
* - {Object} [transcode|JSON.stringify/parse]
* - {Function} encode - custom serialize method
* - {Function} decode - custom deserialize method
* - {Boolean} [isBroadcast] - whether broadcast subscrption result to all followers or just one, default is true
* - {Number} [responseTimeout] - response timeout, default is 3 seconds
* - {Number} [maxWaitTime|30000] - leader startup max time, default is 30 seconds
* @return {ClientWrapper} wrapper
*/
this.cluster = (clientClass, options) => {
options = Object.assign({}, this.config.clusterClient, options, {
// cluster need a port that can't conflict on the environment
port: this.options.clusterPort,
// agent worker is leader, app workers are follower
isLeader: this.type === 'agent',
logger: this.coreLogger,
});
const client = cluster(clientClass, options);
this._patchClusterClient(client);
return client;
};
...
...
}
...
...
}
上面👆代码是在agent 和 application对象上挂了一个名为cluser
的创建方法,方法返回一个ClientWrapper
实例。
Cluster-Client代码结构
|--cluster-client
|--lib
|--protocol
--byte_buffer.js
--packet.js
--request.js
--response.js
--api_client.js
--client.js
--connections.js
--const.js
--default_logger.js
--default_transcode.js
--follower.js
--index.js
--leader.js
--server.js
--symbol.js
--utils.js
这里我们先重点关注api_client.js / index.js / client.js
这三个源码。回想到上面Egg文档给我提供的创建apiClient
的例代码👇 :
new APIClient(Object.assign({}, config, { cluster: app.cluster });
我们就来到了cluster-client/lib/api_client.js
, 这里将app.cluster
方法传入,参考源码:
1 constructor(options) {
2 options = options || {};
3 super(options);
4 const wrapper = (options.cluster || cluster)(
5 this.DataClient, this.clusterOptions
6 );
7 for (const from in this.delegates) {
8 const to = this.delegates[from];
9 wrapper.delegate(from, to);
10 }
11 this._client = wrapper.create(options);
12 utils.delegateEvents(this._client, this);
13 if (!options.initMethod) {
14 this._client.ready(err => {
15 this.ready(err ? err : true);
16 });
17 }
18 }
第4行代码直接就调用了cluster
方法创建了一个ClientWrapper
实例,第11行调用了wrapper的create方法,这样我们就来到了cluster-client/lib/index.js
:
// 去掉不分析的代码
...
create (...args) {
...
function createRealClient() {
return Reflect.construct(clientClass, args);
}
const client = new ClusterClient(Object.assign({
createRealClient,
descriptors: this._descriptors,
}, this._options));
...
}
...
create
方法主要是做了一些方法delegate生成和方法校验(下回分析),这里调用了包装了一个反射创建真实RegistryClient
实例的方法并传入ClusterClient
生成了一个实例最终返回给调用者其实就是APIClient
中的_client
,那么这样就来到了重点的cluster-client/lib/client.js
, 方便查看这里直接就贴出[init]
部分代码:
async [init]() {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = await ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
await ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = await ClusterServer.create(name, port);
}
if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}
...
}
代码非常清晰,如果是leader就会创建一个server并监听<port>, 如果是follower就链接server的<port>端口(可以查看server.js代码)。然后分别new了Leader和Follower两个实例并赋值给[innerClient]
。当我们再查看Leader.js
的代码时,发现在构造函数里有 this._realClient = this.options.createRealClient();
, 原来真正的client是在这个时间创建的,而查看Follower.js
的代码时发现都是发送的tcp请求。这样上面的两个问题我们就都有了答案。
- agent进程起来后加载agent.js的时候设置了
cluster
方法,在beforeStart
时通过new APIClient
初始化_client
属性的同时启动了一个tcp server并在new Leader
对象时初始化正在的client。Egg的agent进程只有一个因此真正的client实例也只有一个。 - 当调用
APIClient
的方法时就会通过_client
属性调用到ClusterClient
,然后再调用它内部[innerClient]
, 而[innerClient]
分别是Leader
和Follower
的实例,所以如果是leader就直接调用realClient否则就发送tcp请求。
至此cluster-client的多进程增强模式的主从原理就分析完成了,在实际的实现过程中具体的调用还是有一些规则和约束,如:
delegates
的设置以及subscribe / publish / invoke / invokeOneway
分别是如何使用的还需要进一步了解。