从说说Egg.js中的多进程增强模型(一)中我们了解到了多进程模型之间的通信方式和各个类之间的关系,可以用下面👇这张图进行回顾:
所有对于
APIClient
的方法调用,最终都会将调用执行到follower.js / leader.js
这两个实例中,在follower.js
中会通过tcp将方法调用发送给leader.js
,在leader.js
中无论是APIClient
或是tcp请求过来的方法调用都会调用内部的_realClient
。
第一篇的整个主从模式的介绍还是非常笼统的,整体上对于多进程模型以及类关系图有一个全貌的印象,这样我们在使用
Clueter-client
类库时就不会只是调用一个黑盒了。但是类库真正的细节,制定的规则和约束还是需要具体分析的,这也是本篇的重点:
思路整理
跨进程调用协议
worker进程调用agent进程内实例的方法,双方肯定需要进行协议的约定,这样当接受请求时才能执行正确的调用逻辑并返回相应的数据。
API调用
对于一个业务客户端(如:zookeeper客户端 -> zkClient)
的调用,每一个Worker进程都希望自己是独占的,如原生API一般的使用多进程模型(如:zkClient.getData(path)
调用,多进程中依然可以调用同样的api)。因此多进程模型需要考虑的一点就是不能改变这一使用习惯。
API代理
worker中所有关于原生client的调用都是需要经过底层的协议转换之后请求agent中的leader进行执行,不可能每一个方法都去编写这样的逻辑,需要将所有的方法调用最终全部代理到一个方法或者若干个确定的方法上,这样只要在底层一次性实现相关的协议转换和tcp请求处理的逻辑,上层业务完全透明。
源码分析
经过上面的思路整理,我们就可以在代码中找到相应的实现,以及也会清晰的明白为什么会需要有这些类,以及每个类存在的职责。代码分析我们还是从上层使用到底层实现这一的顺序来分析比较顺畅。
api_client.js --> APIClientBase
APIClientBase
类是库给业务提供的一个基类,业务层的每一个worker所持有的APIClient
都是继承这个基类,这个类就是用来解决上面👆所提到的“API调用”的问题,业务层在这个类中需要对原生client的API进行定义,不用真正实现,只需要像下面这个直接调用_client
即可:
APIClient extends APIClientBase {
getData(path) {
this._client.getData(path);
}
}
通过上一篇文章的分析我们知道这里的_client
属性实际是client.js 内定义的 ClusterClient类
。
client.js --> ClusterClient
由上面的代码我们知道,getData
这个方法会直接调用 ClusterClient
的getData
方法,这样问题就来了,ClusterClient
作为一个底层的API代理类不可能实现所有的业务需要的API。进到ClusterClient
内部会发现有下面几个方法:
/**
* do subscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[subscribe](reg, listener) { ... }
/**
* do unSubscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[unSubscribe](reg, listener) { ... }
/**
* do publish
*
* @param {Object} reg - publish info
* @return {void}
*/
[publish](reg) { ... }
/**
* invoke a method asynchronously
*
* @param {String} method - the method name
* @param {Array} args - the arguments list
* @param {Function} callback - callback function
* @return {void}
*/
[invoke](method, args, callback) { ... }
async [close]() { ... }
这几个方法的内部都是调用了innerClient
,这之后就是本篇开始梳理的流程。那么既然CluserClient
只有这个几个方法,怎么可以成功调用getData(path)
? 也许我们观察到了[invoke](method, args, callback) { ... }
这个方法,这个方法的实现很像是一个动态代理,是不是所有的方法都收敛到这个方法上了呢?如果真的是这样的话,那么必须要对其进行hook或者其它heck的方式,一般做这种事情都是在实例创建的时候干的,我们就去index.js --> ClientWrapper
的create方法(删减):
const autoGenerateMethods = [
'subscribe',
'unSubscribe',
'publish',
'close',
];
...
create(...args) {
...
// auto generate description
if (this._options.autoGenerate) {
this._generateDescriptors();
}
for (const name of descriptors.keys()) {
let value;
const descriptor = descriptors.get(name);
switch (descriptor.type) {
case 'override':
value = descriptor.value;
break;
case 'delegate':
if (/^invoke|invokeOneway$/.test(descriptor.to)) {
if (is.generatorFunction(proto[name])) {
value = function* (...args) {
return yield cb => { client[symbols.invoke](name, args, cb); };
};
} else if (is.function(proto[name])) {
if (descriptor.to === 'invoke') {
value = (...args) => {
let cb;
if (is.function(args[args.length - 1])) {
cb = args.pop();
}
// whether callback or promise
if (cb) {
client[symbols.invoke](name, args, cb);
} else {
return new Promise((resolve, reject) => {
client[symbols.invoke](name, args, function(err) {
if (err) {
reject(err);
} else {
resolve.apply(null, Array.from(arguments).slice(1));
}
});
});
}
};
} else {
value = (...args) => {
client[symbols.invoke](name, args);
};
}
} else {
throw new Error(`[ClusterClient] api: ${name} not implement in client`);
}
} else {
value = client[Symbol.for(`ClusterClient#${descriptor.to}`)];
}
break;
default:
break;
}
Object.defineProperty(client, name, {
value,
writable: true,
enumerable: true,
configurable: true,
});
}
return client;
}
_generateDescriptors() {
const clientClass = this._clientClass;
const proto = clientClass.prototype;
const needGenerateMethods = new Set(autoGenerateMethods);
for (const entry of this._descriptors.entries()) {
const key = entry[0];
const value = entry[1];
if (needGenerateMethods.has(key) ||
(value.type === 'delegate' && needGenerateMethods.has(value.to))) {
needGenerateMethods.delete(key);
}
}
for (const method of needGenerateMethods.values()) {
if (is.function(proto[method])) {
this.delegate(method, method);
}
}
const keys = Reflect.ownKeys(proto)
.filter(key => typeof key !== 'symbol' &&
!key.startsWith('_') &&
!this._descriptors.has(key));
for (const key of keys) {
const descriptor = Reflect.getOwnPropertyDescriptor(proto, key);
if (descriptor.value &&
(is.generatorFunction(descriptor.value) || is.asyncFunction(descriptor.value))) {
this.delegate(key);
}
}
}
}
这里一下子就明朗了:
- create逻辑里面会根据
descriptors
这个Map内存储的内容做方法自动创建. -
descriptors
内存放的内容来源是APIClient --> delegates
方法返回内容、autoGenerateMethods数组固定值以及RegistryClient
内的异步方法。 - 经过
_generateDescriptor
之后所有的方法最终都会被归类(subscribe/unSubscribe/publish/close/invoke/invokeOneway
)正好对应到前面ClusterClient
类的5个方法(invoke|invokeOneway 都对应 [invoke])。 - 归类好的
descriptors
在create内所有invoke|invokeOneway
会被全部指向ClusterClient --> [invoke]
。
上面的那个例子补充完整如下:
APIClient extends APIClientBase {
get delegates() {
return {
'getData':'invoke'
}
}
getData(path, callback) {
this._client.getData(path, callback);
}
}
tcp 调用相关
协议的定义在/protocol
目录内,底层tcp的调用是基于另一个库 tcp-base
。调用的细节在源码follower.js / leader.js
中都可以清晰看到。
补充
如果是完全自己编写一个插件业务(如:etcd的client),那么RegistryClient
可以直接作为原生API的实现类,然后在APIClient
的delegates方法然后一个api的mapping并定义相应的mock api。但是往往在真实开发过程中,业务的client的已经有实现好的Node包,而Egg插件只需要封装它就行,那么这样就需要将RegistryClient
作为业务client的代理类,再次进行调用静态或动态转发,具体可以看一下我写的Cat的egg插件egg-cat-client
。
总结: 经过整个调用链路的梳理和底层一些规则的说明,我们已经对这样一个多进程的实现了然于胸了,这样在真实的开发使用中才可以写出更加符合自己需要的代码。