原文地址:https://davidwalsh.name/concurrent-generators
作者:Kyle Simpson
发布时间:2014/4/12
如果已经读过本系列的前三部分,那么此时你对 ES6 生成器应该是信心满满的。希望你喜欢这种探索它们还能做什么的挑战。
我们最后要讨论的主题其实是个前沿问题,你可能会觉得有点虐脑(老实说,我现在也还在被虐中)。深入并思考这些问题需要花费时间,当然,你还要再多读一些关于这个主题的文章。
不过你现在的投资从长远来说会是非常有价值的,我非常确信未来 JS 的复杂异步编程能力,会从这里得到提升。
正统 CSP(通信顺序进程,Communicating Sequential Processes)
首先,我是受到了 David Nolen 非凡的工作的鼓舞,才投入到这一主题的。认真讲,他写的有关这一主题的文章都值得阅读。以下是一些他的文章,可以用来入门:
- "Communicating Sequential Processes"
- "ES6 Generators Deliver Go Style Concurrency"
- "Extracting Processes"
OK,接下来是我对这一主题的理解。在使用 JS 前,我并没有 Clojure 语言的背景,或者 Go、ClojureScript 语言的经验。很快我就在这些文章中迷失了,我必须做大量的试验和学习,才能从中收集一点知识。
在这个过程中,我认为我得到了一些有着相同思想和目标的东西,但却是以一种并不那么正统的思考方式得出的。
我尝试做的是建立比 Go 语言风格的 CSP(以及 ClojureScript core.async)更简单的 API,同时最大程度地保留(希望如此!)各种潜在的能力。完全有可能,比我更聪明的人很快发现我的探索所错过的东西。如果是这样的话,希望我的探索能够不断完善和发展,我也会和读者们持续分享我的新发现!
破坏 CSP 理论(一点点)
CSP 到底是什么呢?“通信”是什么意思?“顺序”?“进程”又是什么?
首先,CSP 来源于 Tony Hoare 的书《通信顺序进程》。这是非常深奥的计算机科学理论,但如果你喜欢这些学术方面的东西,那这本书是最好的开始。我不想以深奥、晦涩的计算机科学的方式来讨论这个话题,我采用的是非常不正式的方式。
我们先从“顺序”开始。这应该是你已经熟悉的部分了。这其实是换了个方式讨论 ES6 生成器的单线程行为以及类似同步模式的代码。
别忘了生成器的语法是这样的:
function *main() {
var x = yield 1;
var y = yield x;
var z = yield (y * 2);
}
这些语句都是同步顺序(按照出现的前后顺序)执行的,一次执行一条。yield
关键字标记了那些会出现打断式的暂停(只是在生成器代码内部打断,而非外部的程序)的位置,而不会改变处理*main()
的外部代码。很简单,不是吗?
接下来,我们来看“进程”。这些是什么呢?
本质上来说,生成器的各种行为就像是虚拟的“进程”。如果 JavaScript 允许的话,它就像是程序中并行于其他部分运行的一部分代码。
实际上,这有点乱说了一点。如果生成器可以访问共享内存(这是指,它可以访问其内部的局部变量以为的“自由变量”),那么它就并没有那么独立。但是让我们假设有一个没有访问外部变量的生成器(这样 FP 理论会称之为“连接器(combinator)”),这样理论上它可以运行在自己的进程中,或者说作为单独的进程运行。
不过我们说的是“进程(processes)”——复数——因为最重要的是有两个或多个进程同时存在。也就是说,两个或多个生成器匹配在一起,共同完成某个更大的任务。
为什么要把生成器拆分开呢?最重要的原因:功能或关注点的分离。对于任务 XYZ,如果能将其拆分为子任务 X、Y、Z,然后在单独的生成器中进行实现,这会使得代码更容易理解和维护。
也是基于同样的原因,才会将类似 function XYZ()
的代码拆分为 X()
、Y()
、Z()
函数,然后 X()
调用 Y()
,Y()
调用 Z()
,等等。我们将函数进行拆分使得代码更好地分离,从而更容易维护。
我们可以用多个生成器来实现相同的事情。
最后,“通信”。这是什么呢?它延续自上面 —— 合作 —— 如果生成器需要一起工作,它们需要一个通信通道(不仅仅是访问共享的词法作用域,而是一个真实共享的排外的通信通道)。
通信通道里有什么呢?任何需要传递的东西(数值,字符串,等等)。实际上,并不需要真的在通道发送消息。“通信”可以像协作一样简单 —— 例如将控制权从一个转移到另一个。
为什么要转移控制权?主要是由于 JS 是单线程的,某一时刻只能有一个生成器在执行。其他的处于暂停状态,这意味着它们在执行任务的过程中,但因为需要等待在必要的时候继续执行而挂起。
任意的独立的“线程”都可以神奇地协作并通信好像并不现实。这种松耦合的目标是好的,但是不切实际。
相反,任何成功的 CSP 的实现,都是对于已有的问题领域的逻辑集合进行内部分解,并且每一部分都被设计为能够与其他部分共同工作。
或许在这方面我完全错了,但我还没有看到有什么有效的方式, 可以使得两个任意的生成器函数能够简单地粘在一起作为 CSP 配对使用。它们都需要被设计为可以与另一个一同工作,遵循通信协议,等等。
JS 中的 CSP
有几种有趣的 CSP 探索应用于 JS 了。
前面提及的 David Nolen,有几个有趣的项目,包括 Om,以及 core.async。Koa 库(用于 node.js)有一个有趣的特性,主要通过其 use(..)
方法。另一个与 core.async/Go CSP 接口一致的库是 js-csp。
建议你将这些项目检出来看看各种在 JS 中应用 CSP 的方式和例子。
asynquence 的 runner(..)
:设计 CSP
既然我一直在尝试将 CSP 模式应用于自己的代码,那么为我的异步流程控制库 asynquence 增加 CSP 能力就是很自然的选择了。
我之前演示过使用 runner(..)
插件来处理生成器的异步运行(见第三部分),所以对我而言以类似 CSP 的方式同时支持处理多个生成器是很容易的。
第一个设计问题是:怎样知道哪个生成器来控制下一个(next)?
让进程有某种 ID,从而可以彼此知道,这有点笨重,不过这样它们就可以直接传递消息和将控制权转移给另一个进程。在经过一些试验后,我选择了简单的循环调度方法。对于三个生成器 A、B、C,A 首先获得控制权,然后当 A 抛出(yield)控制权后由 B 接手,接着由 C 接手 B,再然后是 A,如此往复。
但我们实际转移控制权呢?需要有对应的 API 吗?再一次,经过一些试验后,我选择了更隐蔽的方法,和 Koa 的做法类似(完全是偶然地):每个生成器获得一个共享的“token”—— yield
返回它时表示进行控制转移。
另一个问题是消息通道应该是什么样的。或许是一个正式的通信接口,如 core.async 和 js-csp 那样(put(..)
和 take(..)
)。根据我自己的实验,我更倾向于另一种方式,一个不那么正式的方法(甚至不是 API,而是类似 array
的共享的数据结构)就够用了。
我决定使用数组(称为 messages
),可以任意地根据需要写入和提出数据。可以将数据 push()
到数组,从数组 pop()
出来,给不同的数据分配不同的位置,或者在里面存储更复杂的数据结构,等等。
我觉得对于一些任务来说只需要简单的数据传递,对于另一些则要更复杂些,所以与其让简单的情况变复杂,我选择不将消息通道正式化,而是只有一个 array
(于是没有 API,只剩下 array
本身)。如果你觉得有必要,也很容易给数据传递增加一些规范性(见下面的 状态机 例子)。
最终,我发现这些生成器“进程”仍然可以获得异步生成器的那些好处。换句话说,如果不是抛出控制 token,而是 Promise(或一个 asynquence 序列),runner(..)
的机制会暂停来等待这个值,而 不会转移控制权 —— 相反,它会将数据返回给当前的进程(生成器)使其重新获得控制权。
后面的观点可能(如果我解释地正确的话)是最有争议或最不像其他库的地方。或许真正的 CSP 会不屑于这些方法。不过,我觉得有这些想法是很有用的。
一个简单的 FooBar 示例
理论已经够多了,让我们来看看代码:
// 注意:略去了 `multBy20(..)` 和 `addTo2(..)` 这些异步数学函数
function *foo(token) {
// 从通道的顶部获取数据
var value = token.messages.pop(); // 2
// 将另一个数据放到通道上
// `multBy20(..)` 是一个产生 promise 的函数,
// 在延迟一会之后将一个值乘以 `20`
token.messages.push( yield multBy20( value ) );
// 转义控制权
yield token;
// CSP 运行返回的最后的数据
yield "meaning of life: " + token.messages[0];
}
function *bar(token) {
// 从通道的顶部获取数据
var value = token.messages.pop(); // 40
// 将另一个数据放到通道上
// `addTo2(..)` 是一个产生 promise 的函数,
// 在延迟一会之后将一个值加上 `2`
token.messages.push( yield addTo2( value ) );
// transfer control
yield token;
}
OK,以上是两个生成器“进程”,*foo()
和 *bar()
。可以注意到,两个都是处理 token
对象(当然,你也可以随便怎么称呼它)。token
的 message
属性就是共享的消息通道。它由 CSP 初始化运行时传入的数据填充(见后面)。
yield token
隐含地转移控制到“下一个”生成器(循环顺序)。不过,yield multBy20(value)
和 yield addTo2(value)
都是抛出 promise(从略去的延迟数学函数),这意味着生成器会暂停,直到 promise 完成。当 promise 完成,当前出于控制状态的生成器会继续执行。
无论最后的 yield
值是什么,在 yield "meaning of...
表达式语句中,这都是 CSP 运行的完成消息(见后面)。
现在我们有两个 CSO 进程生成器,怎么运行呢?使用 asynquence:
// 使用初始数据 `2` 启动一个序列
ASQ( 2 )
// 一起运行这两个 CSP 进程
.runner(
foo,
bar
)
// 无论最后得到什么消息都向下一步传递
.val( function(msg){
console.log( msg ); // "meaning of life: 42"
} );
显然,这只是一个测试示例。不过我想这已经很好地展示了相关概念。
现在你可以自己来试试(试着改变下数据!)从而确信这些概念有用,并且你能自己写出代码。
另一个玩具示例
现在我们来看一个经典的 CSP 的例子,不过是以前面介绍的我的方式,而不是以学术上的视角。
乒乓。很有意思的运动是不是!?这是我最喜欢的运动。
我们假设你已经实现了一个乒乓游戏的代码。你有一个循环以运行游戏,并且你有两部分代码(例如,使用 if
或 switch
语句的分支)分别代表两个选手。
你的代码运行良好,你的游戏就像乒乓比赛那样运行!
但是关于 CSP 为什么有效我说过什么呢?关注点或功能的分离。乒乓游戏中的分离的功能是什么呢?这两个选手嘛!
所以,从一个较高的层面上,我们可以将游戏建模为两个“进程”(生成器),分别对应每个选手。当我们进入实现的细节,我们会发现在两个选手间转移控制的“胶水代码”是一个单独的任务,这部分代码可以是第三个生成器,我们可以将其建模为游戏裁判。
我们将会跳过所有的领域特定的问题,例如比分、游戏机制、物理、游戏策略、AI、控制,等等。我们唯一关心的部分是模拟来回的击打(这其实是对 CSP 控制转移的比喻)。
想看看 demo 吗?运行一下吧(注意:使用一个较新版本的 FF 或 Chrome,支持 ES6 从而可以运行生成器)
现在,我们来一段一段看下代码。
首先,asynquence 序列长什么样呢?
ASQ(
["ping","pong"], // 选手名字
{ hits: 0 } // 乒乓球
)
.runner(
referee,
player,
player
)
.val( function(msg){
message( "referee", msg );
} );
我们使用两个初始数据:["ping","pong"]
和 { hits: 0 }
。我们很快会讨论这些。
然后我们建立了 CSP 来运行 3 个进程(协程(coroutine)):一个 *referee()
和两个 *player()
实例。
游戏最终的数据会传入序列中的下一步骤,然后我们会输出来自裁判的数据。
裁判的实现:
function *referee(table){
var alarm = false;
// 裁判在自己的定时器上设置警报(10秒)
setTimeout( function(){ alarm = true; }, 10000 );
// 让游戏保持运行直到警报响起
while (!alarm) {
// 让选手继续
yield table;
}
// 告知选手游戏结束
table.messages[2] = "CLOSED";
// 然后裁判说了什么呢?
yield "Time's up!";
}
我调用控制 token table
来匹配问题域(乒乓游戏)。当选手将球击回的时候“转移(yield) table”是很好的语义,不是吗?
*referee()
中的 while
循环保持转移 table
,只要他的定时器上的警报没有响起。警报响的时候,他会接管游戏,然后通过 "Time's up!"
宣布游戏结束。
现在,我们来看下 *player()
生成器(我们使用了它的两个实例):
function *player(table) {
var name = table.messages[0].shift();
var ball = table.messages[1];
while (table.messages[2] !== "CLOSED") {
// 击球
ball.hits++;
message( name, ball.hits );
// 当球返回另一个选手时产生延迟
yield ASQ.after( 500 );
// 游戏还在继续?
if (table.messages[2] !== "CLOSED") {
// 球现在在另一个选手那边了
yield table;
}
}
message( name, "Game over!" );
}
第一个选手从数据的数组中取出他的名字("ping"
),然后第二个选手获取他的名字("pong"
),所以他们都能正确识别自己。两个选手记录了一个到共享的 ball
对象的引用(包含一个 hits
计数器)。
如果选手们没有从裁判那里听到结束的消息,他们通过增加 hits
计数器来“击打” ball
(并输出一个消息来发布出来),然后等待 500
ms(因为球不能以光速传播!)。
如果游戏仍在继续,他们紧接着“转移球台”给另一个选手。
就是这样!
看下 demo 的代码,可以了解到让这些部分一起工作的完整上下文代码。
状态机:生成器协程
最后一个例子:定义一个状态机,即由一个辅助工具来驱动的一组生成器协程。
Demo(注意:使用一个较新版本的 FF 或 Chrome,支持 ES6 从而可以运行生成器)
首先,定义一个控制有限状态处理器的辅助工具:
function state(val,handler) {
// 为状态创建一个协程处理器(包装)
return function*(token) {
// 状态变化处理器
function transition(to) {
token.messages[0] = to;
}
// 缺省的初始状态(如果没有设置)
if (token.messages.length < 1) {
token.messages[0] = val;
}
// 保持运行直到达到最终状态(false)
while (token.messages[0] !== false) {
// 当前状态匹配处理器?
if (token.messages[0] === val) {
// 委托到处理器
yield *handler( transition );
}
// 转移控制到另一个状态处理器?
if (token.messages[0] !== false) {
yield token;
}
}
};
}
state(..)
辅助工具函数创建了一个对应特定状态值的委托生成器的包装对象,该对象会自动运行状态机,并在每次状态改变时转移控制权。
纯粹是由于个人喜好,我决定由共享的 token.messages[0]
来记录状态机的当前状态。这意味着将序列的上一步传入的数据作为初始状态使用。不过如果没有设置初始数据,则缺省使用第一个状态作为初始状态。同样是个人喜好的缘故,最终状态被设为 false
。这个很容易根据你自己的喜欢进行修改。
状态值可以是你喜欢的任意类型的值:number
、string
,等等。只要可以通过 ===
严格测试的值,你都可以用来作为状态值。
在接下来的例子中,我会演示一个变化四个 number
状态值的状态机,按照特定的顺序:1 -> 4 -> 3 -> 2
。仅为了演示目的,会使用一个计数器,从而可以执行该变化循环不止一次。但状态机最终达到最终状态(false
)时,asynquence 序列向下一步移动,和预期的一样。
// 计数器(仅为了演示的目的)
var counter = 0;
ASQ( /* 可选的:初始化状态值 */ )
// 运行状态机,变化:1 -> 4 -> 3 -> 2
.runner(
// 状态 `1` 处理器
state( 1, function*(transition){
console.log( "in state 1" );
yield ASQ.after( 1000 ); // 暂停 1s
yield transition( 4 ); // 跳转到状态 `4`
} ),
// 状态 `2` 处理器
state( 2, function*(transition){
console.log( "in state 2" );
yield ASQ.after( 1000 ); // 暂停 1s
// 仅为了演示的目的,判断是否继续状态循环?
if (++counter < 2) {
yield transition( 1 ); // 跳转到状态 `1`
}
// 全部完成!
else {
yield "That's all folks!";
yield transition( false ); // 跳转到退出状态
}
} ),
// 状态 `3` 处理器
state( 3, function*(transition){
console.log( "in state 3" );
yield ASQ.after( 1000 ); // 暂停 1s
yield transition( 2 ); // 跳转到状态 `2`
} ),
// 状态 `4` 处理器
state( 4, function*(transition){
console.log( "in state 4" );
yield ASQ.after( 1000 ); // 暂停 1s
yield transition( 3 ); // 跳转到状态 `3`
} )
)
// 状态机完成,所以继续下一步
.val(function(msg){
console.log( msg );
});
很容易可以跟踪这里的过程。
yield ASQ.after(1000)
说明这些生成器可以做任何基于 promise/sequence 的异步处理,这个与之前看到过一样。yield transition(..)
用于转换到新的状态。
上面的 state(..)
辅助函数完成了工作中困难的部分,处理 yield*
委托和状态跳转,使得状态处理器可以非常简单和自然。
总结
CSP 的关键在于将两个或更多的生成器“进程”连接在一起,提供一个共享的通信通道,以及可以在彼此间转移控制权的方法。
已经有一些 JS 库以正统的方式实现了和 Go、Clojure/ClojureScript 差不多的 API 和语义。这些库背后都有些聪明的开发者,并且他们都提供了很多有关进一步探索的资源。
asynquence 尝试采用一个不那么正统的但希望仍保留了主要的机制的方式。如果没有更多的需求,asynquence 的 runner(..)
对于开始探索类似 CSP 的生成器已经非常容易了。
不过最好的地方是将 asynquence 的 CSP 与其他的异步功能一起使用(promise、生成器、流程控制,等等)。这样,你就有了所有领域的最好的部分,从而在处理手头的工作时可以选用任何更适合的工具,而这些都在一个较小的库中。
在过去的四篇文章中,我们在非常多的细节上探索了生成器,希望你会因为发现了可以如何革新自己的异步 JS 代码而感到兴奋和鼓舞!你会使用生成器来创造什么呢?
译注
翻译的过程并不轻松,不仅要理解原文,还要尽我所能以较为通顺的中文重新表达出来,这方面显然我还有很多要学。
尽管已经尽力避免译文出现歧义或错误,但个人能力有限,仍不能确保不会有。各位同学如有发现,欢迎指正,先谢过!