FRP 实例简介(RxJS / RxSwift / ReactiveCocoa)

FRP是异步数据流编程


这不是什么新鲜的东西了。在前端编程中(用Javascript),监听某个按钮的点击事件,并在事件被触发以后回调一个函数做一些操作,这个过程就是异步数据流编程,也就是FRP。FRP的灵感来源于细胞的激素刺激,你可以回想一下初中生物学的“生物应激”。我们可以为任何东西创建数据流(Stream),不仅仅局限于click和hover事件。Stream是随处可见的,任何东西都可以成为Stream:变量、用户的输入、属性、缓存、数据结构等等。举个例子,微博的推荐(推荐好友,推荐好新闻)就是一种和click事件一样的Stream,你可以监听它的里面(推荐事件)并按需作出响应。
单一的一个Stream可以用来作为另一个Stream的输入,甚至多个Stream也可以输入给某一个Stream。假设现在你有一个超牛逼的工具集,包含了很多功能可以让你合并、创建和过滤Stream。那么你就可以merge(合并)多个Stream,对Stream做你只感兴趣的事件的filter(过滤),也可以把一个Streammap(映射)为另外一个新的Stream。
既然Stream是FRP的核心,那我们就从最熟悉的“click a button”来仔细的了解Stream。

zoom-图1
zoom-图1

从上面可以看出,Stream是一个不间断的按照时间顺序排列的event序列。它可以发出三样信号:值(value,对应于某些类型)、错误(error)和完成(completed)。只有当按钮所在的窗口或者视窗被关闭的时候,才会发出“完成”信号。
既然知道Stream会发出(emit)三种信号,那么我们就可以为其定义三个对应的执行函数异步的捕捉并处理这三种信号量。有时候,对于从Stream里面发出的error和completed,你可以按照需要选择捕捉处理或不捕捉处理。对Stream的“监听”叫做“订阅(subscribe)”,这些执行函数就是“观察者(observeers)”,Stream就是被观察的“主体(subject)”或者“可观察序列(observable)”。这正是观察者模式的设计体现。
在本教程的某些地方,我会用ASCII图来表示Stream,就叫做StreamGraph吧:

--a---b-c---d---X---|->
a, b, c, d :事件对应的类型或者值
X :错误
| :完成
---> :时间轴

下面一起来做些有趣的尝试:把“Stream(origin click event)”转换为“Stream(counter click event)”。
使用FRP的时候,Stream会被拓展了很多方法,如map
filter
scan
等。当调用其中一个方法时,如clickStream.map(f)
,它将返回一个“new Stream”。这意味着“origin Stream”没有发生改变。在FRP里,Stream具备恒定(immutability)的特性,说白了Stream是一个发生后即不可变的序列。所以clickStream.map(f).scan(g)
这样链式调用可以在Stream上操作。

clickStream: ---c----c--c----c------c--> 
                        vvvvv map(c becomes 1) vvvv
                        ---1----1--1----1------1--> 
                        vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

上面的例子,map(f)
方法用传入的f
函数替代每个发出的信号量,把每次的点击事件映射为数值1。map
产生的流会被scan(g)
方法扫描并用g(accumulated, current)
来聚合,在例子中就是简单地相加。最后的countStream
统计了每次点击时,当前一共产生了多少次点击事件。
为了体现FRP的强大,假设现在需要一个double-click(双击)Stream(短时间内两次或三次的单击认为是双击)。深呼吸并回想你是怎么用传统的方式来实现的。我敢打赌一定让人非常抓狂,因为会需要大量的变量去描述每个阶段的状态,还会用到定时处理。
但用FRP会让事情变得非常简单。事实上,仅需要4行逻辑代码。不过,我们先忽略代码来看个图:

图2
图2

不用需要理解底层是如何实现的,只管用就是了。灰色框里面的函数把一个Stram转换为另外一个Stream。先用buffer(stream.throttle(250ms))
判定出那些单击可以归为一次双击,从而获得一个新的归并后的单击Stream。然后用map()
做事件数量统计,获得一个新的含有每个归并单元中事件数量的Stream。最后用filter(x >= 2)
来忽略数量为1
的归并单元。就这样,通过3步操作获得所需要的Stream。现在可以按照需要,通过subscribe对双击Stream进行监听并做出响应。
我希望你会享受这种编程方式。这个例子仅仅是冰山一角:你可以用FRP实现的类库如“Rx*”来做更多。

为什么要用FRP?

FRP提高了抽象的层次,因此你可以专注于业务逻辑里面事件间的相互依赖,而不需要关心一大堆实现的细节。用FRP将会使代码变得更加简洁。

FRP的优势在富“UI事件和数据事件交互”的现代Web、移动应用得到了证明。10年前,Web页面的交互基本上就是提交一个大表单到后端,然后在前端执行简单的渲染。应用逐渐变得更加实时:修改单个表单域能够自动触发保存到后台,内容会根据个人的“喜好”匹配到相关用户等等。

现今的应用需要通过丰富多样的实时事件来提供高水平的用户体验,而FRP可以很好解决。

实例讲解FRP

让我们来点干货。通过一个真实的例子一步步的理解FRP。在教程的最后会给出所有的代码,同时了解每行代码在做什么。
我用Javascript和RxJS作为工具,那是因为:Javascript是目前比较熟悉的语言,同时Rx*库系列提供了对多种语言和平台的(.NETJavaScalaClojureJavaScriptRubyPythonC++Objective-C/CocoaGroovy等等)支持。所以无论你用什么工具,都可以按照本教程享受FRP的好处。

实现“推荐关注”

在微博里,有个专门推荐新用户给你关注的面板:


图3
图3

这个“推荐关注”的实现包含以下这些核心特点:
启动的时候,从API中加载并显示3条其他账户信息。
点击“Refresh”按钮时,重新加载3条其他账户信息。
点击每条账户信息的“x”按钮时,清掉当前这条账户,并显示另外一条。

微博里面对未通过认证的开发着不公开这个“推荐关注”的API,因此我们用Github的API来实现。

请求和响应

你会怎么用FRP来解决这个问题呢?嗯,开始的时候(几乎)把任何东西都流化**。这几乎成了FRP的魔咒。我们从最简单的功能开始:“启动的时候,从API中加载并显示3条其他账户信息。”。这里没有任何问题,就是简单的(1)发送一个请求,(2)接收一个响应,(3)渲染这个响应。因此,我们用Stream来代表(1)的请求。起初这感觉像杀鸡用牛刀,但是我们需要从基本的东西开始,对吧?
启动的时候只需要发送一次请求,那么对应的StreamModel(流模型)是一个只含有一个值的Stream。最后会发现启动的时候会有很多请求,但目前只有一个:

--a------|->
a :'https://api.github.com/users'这个字符串

这是一个请求地址(URL)Stream。这条Stream告诉了我们两件事情:“什么时候”和“是什么”(When & What)。“什么时候”意味着当产生事件(emit event)时要发送请求,而“是什么”说明了产生的事件(emitted event)是一串请求地址字符。
用“Rx”创建Stream是非常简单的。Stream的术语是“Observable(可观察序列)”,这说明它是可以被其他人观察的。但我发现这真是个愚蠢的名词,所以我更喜欢称它为“Stream(流)”*。

RxJS

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

__ RxSwift__

let requestStream = Observable.just("https://api.github.com/users")

RAC

RACSignal *requestStream = [RACSignal return:@"https://api.github.com/users"];

这里现在有一个只含一个字符串事件的Stream,但是没有任何操作,所以我们需要添加一个处理即将到来的字符串事件的函数。下面给requestStream
加上subscribing

RxJS

requestStream.subscribe(function(requestUrl){ 
        // 发送请求 
        jQuery.getJSON(requestUrl, function(responseData){ 
                   // ...             
        });
});

RxSwift

requestStream.subscribe(onNext: { (urlString) in
            URLSession.shared.dataTask(with: NSURL(string: urlString) as! URL, completionHandler: { (data, response, error) in
  
            }).resume()
})

RAC

RACSignal *requestStream = [RACSignal return:@"https://api.github.com/users"];
    [requestStream subscribeNext:^(id x) {
       [[[NSURLSession sharedSession] dataTaskWithURL:[NSURL URLWithString:x] completionHandler:^(NSData * _Nullable data, NSURLResponse * _Nullable response, NSError * _Nullable error) {
           
       }] resume];
    }];

注意上面我们用了jQuery的Ajax回调处理响应的结果。( ⊙ o ⊙ )!,等等,FRP不是擅长于处理异步数据流吗?能不能把jQuery的结果交给RxJS处理?感觉好像没什么问题,我们来试试:
RxJS

requestStream.subscribe(function(requestUrl){ 
        // 响应也是个流
         var responseStream = Rx.Observable.create(function(observer){ 
                   jQuery.getJSON(requestUrl) // 当jQuery成功调用以后,就把结果交给RxJS处理 
                    .done(function(response){ observer.onNext(response); }) // 当jQuery失败时,把失败交给RxJS处理 
                    .fail(function(jqXHR, status, error){ observer.onError(error); }) // 当jQuery完成时,告知RxJS调用完成的处理 
                    .always(function(){ observer.onCompleted(); }) 
           }); responseStream.subscribe(function(response){ 
                          // 为响应做处理 
            });
});

RxSwift

requestStream.subscribe(onNext: { (urlString) in
            let responseStream = Observable<Any>.create { (observer) -> Disposable in
                URLSession.shared.dataTask(with: NSURL(string:urlString ) as! URL, completionHandler: { (data, response, error) in
                    if error != nil {
                        observer.onError(error!)
                    } else {
                        observer.onNext(data!)
                    }
                    observer.onCompleted()
                }).resume()   
                return Disposables.create()
            }
            responseStream.subscribe { (x) in
                // 响应处理
                print(x)
            }
})

RAC

[requestStream subscribeNext:^(id x) {
        [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
            [[NSURLSession sharedSession] dataTaskWithURL:[NSURL URLWithString:x] completionHandler:^(NSData * _Nullable data, NSURLResponse * _Nullable response, NSError * _Nullable error) {
                if (error) {
                    [subscriber sendError:error];
                } else {
                    [subscriber sendNext:data];
                }
                [subscriber sendCompleted];
            }];
            return [RACDisposable disposableWithBlock:^{
                
            }];
        }];
    }];

因为我们需要发送一个Ajax请求,所以我们就用jQuery包装一下我们的RxJS。上面看起来非常直观,而Rx.Observable.create()
通过传入一个包含observer参数的函数,会返回一个自定义的Stream。当Stream产生任何事件的时候,都会调用这个传入的方法,并传入当前observer。打扰一下,这是不是意味着Promise也是一个“Observable(可观察序列)”?【注:作者这里不用Stream,是为了更加官方的描述Promise。】

是的!
Observable是Promise++。在RxJS里面,你可以很容易的把Promise转换为Observable通过调用var stream = Rx.Observable.fromPromise(promise)
,所以我们来用用它。值得一提的是,Observable和Promises/A+是不兼容的,但概念上并没有冲突。你可以这样理解,Promise就是Observable的单值版本。

可以看到RxJS比起jQuery这类框架实现的Promise要强大多了。当别人大肆吹捧Promises的时候,你给给他说说RxJS。

好吧,回到我们的例子来。你注意到下面这些问题了吗?
把一个subscribe()
调用嵌入了另外一个subscribe()
里面,这可能会陷入“callback hell”
resposneStream
紧密依赖于requestStream
。【注:这里涉及“关注点分离”】

哎呀,那么多问题。幸亏,FRP提供了大量的操作函数来解决上面的问题。
现在相信你已经很清楚基础函数map(f)了。
这是一个把生产流(Stream A)里面的所有值你拿出来执行 f 转换,把转换的结果放入到消费流(Stream B)中。例如,我们正好需要把请求地址(URL)对应的转成一个响应的Stream(Promise可以包装成Stream)。
RxJS

var responseMetastream = requestStream
         .map(function(requestUrl) {
         return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});

RxSwift

let responseMetastream = requestStream
        .map { (urlString: String) -> Observable<Any> in
         return URLSession.shared.rx.json(url: NSURL(string:urlString ) as! URL)
}

RAC

// 2.5.0 还没出NSURLSession的扩展,先用NSURLConnetcion代替
RACSignal *responseMetastream = [requestStream map:^id(id value) {
        return [NSURLConnection rac_sendAsynchronousRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:value]]];
    }];

不过,上面的代码创建了一个怪兽:“metastream”。“metastream”的每个值是一个Stream的指针:每个值指向另外一个Stream【注:map转换以后是流,但是流里面的东西是指向Promise的指针】。在我们的例子中,每个请求URL都被映射为一个指针指向对应包含响应的promise流。

zoom-图5
zoom-图5

响应的“metastream”让人看起来非常困惑,而且实际上我们需要的是一个包含Promise【注:Promise是流】的Stream,而不是一条包含Stream指针的“metastream”。向Flatmap先生说“你好”吧。flatmap()
是map()
的一个“扁平化”处理版本,就像是从“主干”流里分出“支流”,然后对“支流”处理。【注:flatmap和map的对比可以看这里,可以这样理解:map就是在源流的每个事件上用一个“返回值的函数”做了计算并返回值,然后组合再返回新的流。而flatmap是在源流的每个事件上用一个“会回流的函数”做了计算并返回流,然后把返回的流(子流)组合再返回新的流。】值得注意的时候,flatmap()
不是在修复map()
,“metastream”也不是一个错误,它们都是真实的工具用于在FRP中解决异步响应的问题。

RxJS

var responseStream = requestStream
        .flatmap(function(requestUrl){
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});

RxSwift

let responseStream = requestStream
            .flatMap { (urlString: String) -> Observable<Any> in
                return URLSession.shared.rx.json(url: NSURL(string:urlString ) as! URL)
}

RAC
objc
RACSignal *responseStream = [requestStream
flattenMap:^RACStream *(id value) {
return [NSURLConnection rac_sendAsynchronousRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:value]]];
}];

![zoom-图6](https://camo.githubusercontent.com/0b0ac4a249e1c15d7520c220957acfece1af3e95/687474703a2f2f692e696d6775722e636f6d2f4869337a4e7a4a2e706e67)
很好。因为响应的Stream是基于请求的Stream而定义的,所以如果以后我们有更多的事件在请求的Stream中产生,就会有对应的事件在响应的Stream中产生。

requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(小写的是请求,大写的是响应)

既然我们好不容易拥有了响应的Stream,那么我们就可以渲染所接收的数据:

__RxJS__
```js
responseStream.subscribe(function(response){ 
    // 按照你的意愿在DOM树里面渲染response对象
});

RxSwift

responseStream.subscribe(onNext: { (x) in
      // 按照你的意愿渲染View对象         
})

RAC

[responseStream subscribeNext:^(id x) {
        // 按照你的意愿渲染View对象   
}];

我们把前面所有的代码合在一起,那样就是:

RxJS

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');
var responseStream = requestStream
         .flatMap(function(requestUrl) { 
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
 });
responseStream.subscribe(function(response) { 
        // 按照你的意愿在DOM树里面渲染response对象
});

RxSwift

let requestStream = Observable.just("https://api.github.com/users")
let responseStream = requestStream
            .flatMap { (urlString: String) -> Observable<Any> in
                return URLSession.shared.rx.json(url: NSURL(string:urlString ) as! URL)
            }
responseStream.subscribe(onNext: { (x) in
              // 按照你的意愿渲染View对象
})

RAC

RACSignal *requestStream = [RACSignal return:@"https://api.github.com/users"];
RACSignal *responseStream = [requestStream
                                     flattenMap:^RACStream *(id value) {
        return [NSURLConnection rac_sendAsynchronousRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:value]]];
}];
[responseStream subscribeNext:^(id x) {
        // 按照你的意愿渲染View对象
}];

刷新按钮

我没有提及一件事情就是上面的响应返回的JSON制式的用户信息有100条。这个API仅仅允许我们传页偏移值,而不允许传页限制数,所以导致我们只能用3条数据对象而浪费97条。我们现在先忽略这些问题,后面将会看到如何缓存这些响应。
每次刷新按钮被点击的时候,请求的Stream就会产生一个String事件。我们需要两样东西:
刷新按钮上产生点击事件Stream;
上述的刷新按钮的点击事件Stream可以改变请求的Stream。

可喜的是,RxJS具备相应的工具给DOM元素构建指定的事件的Stream:

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

接下来,让刷新按钮的点击事件Stream改变请求的Stream。通过传一个每次都随机产生的参数作为偏移值发送请求给Github:

var requestStream = refreshClickStream
        .map(function(){ 
        var randomOffset = Math.floor(Math.random() * 500); 
        return 'https://api.github.com/users?since=' + randomOffset;
 });

不过现在有个问题,就是请求在启动的时候并不会马上被发送,只会在刷新按钮被点击时才会执行。如何才能在启动的时候马上发送请求并且点击刷新按钮的时候也能发送请求?
首先,我们都知道如何为上面说的两种情况创建对应的Stream:

var requestOnRefreshStream = refreshClickStream
        .map(function() { 
        var randomOffset = Math.floor(Math.random()*500); 
        return 'https://api.github.com/users?since=' + randomOffset; 
});
var startupRequestStream =Rx.Observable.returnValue('https://api.github.com/users');

但是如何“合并”上面这两个Stream为一个Stream呢?不用担心,这里有merge()
。用StreamGraph来描述:

stream A: ---a--------e-----o----->
stream B: -----B---C-----D--------> 
        vvvvvvvvv merge vvvvvvvvv 
        ---a-B---C--e--D--o----->

现在事情就变得简单了:

var requestOnRefreshStream = refreshClickStream 
        .map(function() { 
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
 });
var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');
var requestStream = Rx.Observable.merge(
        requestOnRefreshStream, startupRequestStream
);

这里有另外一个干净简单的方式去书写上面的代码:

var requestStream = refreshClickStream 
        .map(function() { 
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
 }) 
.merge(Rx.Observable.returnValue('https://api.github.com/users'));

甚至更短,可读性更强:

var requestStream = refreshClickStream 
        .map(function() { 
        var randomOffset = Math.floor(Math.random()*500); 
        return 'https://api.github.com/users?since=' + randomOffset; 
}) 
.startWith('https://api.github.com/users');

这个startWith()方法恰好精准的反映了你想要做的事情。无论你传入的Stream是什么样子的,但最后调用startWith(x)
,就会以x作为开始。不过这不够DRY,我重复了访问Github的请求地址。解决这个问题的方法就是通过移动startWith()
到refreshClickStream后,然后在启动时“模拟”一次刷新点击:

var requestStream = refreshClickStream.startWith('startup click') 
      .map(function() {
           var randomOffset = Math.floor(Math.random()*500); 
           return 'https://api.github.com/users?since=' + randomOffset; 
});

给“推荐关注”的每项建模

现在,我们只能在responseStream的subscribe()里,才能够对每项推荐的UI元素做渲染操作。可是,如果你用最快的速度点击刷新按钮的时候,当前的3条推荐都没有被清掉,而新的推荐只有在请求到达以后才会到达。这就看起来好像是点了刷新和不点刷新没有两样似的。但为了让UI看起来更舒服点,我们需要在点击刷新时清除当前的3条推荐。

refreshClickStream.subscribe(function(){
    // 清除3条推荐的DOM元素
});

如果你这么干,现在就会有两个订阅者(一个是refreshClickStream.subscribe()
,另外一个是responseStream.subscribe()
)关联着这3条推荐的DOM元素,事情会变得很糟糕。因为这不是“关注点分离”【注:关注点分离是指只对与“特定概念、目标”(关注点)相关联的软件组成部分进行“标识、封装和操纵”的能力。这是处理复杂性的一个原则。因为关注点混杂在一起将会加大软件的复杂度,而分离开关注点进行处理能够降解复杂度。面向切面编程的核心就是关注点分离。】。

图7
图7

因此我们需要给每项推荐做Stream处理,并使得每个事件都包含响应JSON值。我们将会针对3条推荐中的每条做分离。第1条推荐分离后的样子:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 随机从列表中获取一个
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

对此,suggestion2Streamsuggestion3Stream可以简单的从suggestion1Stream里面复制过来。虽然这不够DRY,不过保证了我们的例子足够的简单。

取代resposneStream.subscribe()里面的渲染操作,我们可以这样做:

suggestion1Stream.subscribe(function(){
    // 渲染第1个推荐到DOM
});

回到“在刷新的时候,清除所有推荐”,我们可以通过映射刷新点击到null的推荐数据,那么在suggestion1Stream里面,就像:

var suggestion1Stream = responseStream.map(function(listUsers){
    return listUsers[Math.floor(Math.random() * listUsers.length)];
})
.merge(
    refreshClickStream.map(function(){ return null; })
);

在渲染的时候,我们把null解析为“没有数据”,从而隐藏对应的UI元素。

suggestion1Stream.subscribe(function(suggestion){
    if (suggestion === null) {
        // 隐藏第1个推荐的DOM元素
    } else {
        // 或者展示第1个推荐的DOM元素并渲染对应的数据
    }
});

那么现在对应的流图:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

 N :代表null

为了更完善,我们也可以在开始的时候渲染“空”推荐。通过添加startWith(null)到第一条推荐的Stream:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

最终的StreamGraph如下:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

 N :代表null

关闭一个推荐以及使用缓存结果集

还有最后一个功能还未实现:每个推荐都会有对应的“x”按钮用于清除当前的推荐并加载新的推荐。刚开始弄的时候,你可能会选择在关闭某个推荐的时候发起一个新的请求:

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// 同理于 close2Button和close3Button
var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // 我们添加了这个,使得点击close1的时候,会触发新的请求
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

但如果我们点击任意一个关闭按钮的时候,它会清除当前所有的推荐并重新加载。有很多方法可以解决,为了让事情更有趣,我们将会重用上次请求后响应的数据去解决这个问题。Github的响应中每页的大小为100个用户信息,然而我们只需要使用到其中的3个,因此会存在大量有效的新数据。不需要再发起新的请求。

我们再把它想象成为Stream。当第一条推荐的“关闭”按钮被点击时,我们需要在resposneStream中上一个的响应数据中随机获取一个用户数据。就像:

requestStream:     --r--------------->
responseStream:    ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

 c :代表关闭

在“Rx*”里面有一个联合函数combineLatest看起来可以实现我们的需求。它把两个不同Stream作为输入,无论其中哪个Stream产生一个事件,combineLatest会组合两个Stream的“上一个”事件,以参数ab的形式然后输出值c = f(x,y),而f是你所定义的函数。用StreamGraph解析:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

f :大写函数

我们可以在close1ClickStream上调用combineLatest(),传入responseStream。这样当点击“关闭”按钮1时,我们都会获得responseStream的上一个事件并计算出新值给suggestion1Stream。另外一方面combineLatest()函数是对称的:responseStream产生新的事件会组合“关闭”按钮1的上一个事件,计算出新值传给suggestion1Stream。这样我们就可以简化之前suggestion1Stream的代码:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

但这里还有一个问题:combineLatest()使用最近两个源,但是如果其中一个没有产生事件,那么组合的Streamsuggestion1Stream)是不会产生事件的。认真观察前面的StreamGraph,你会发现当A流产生a事件时,suggestion1Stream不会产生事件。只有在B流产生b事件的时候,组合的Stream才会产生事件。

我们用最简单的方法来解决这个问题,就是在启动时“模拟”点击了’关闭’按钮1:

var suggestion1Stream = close1ClickStream.startWith('startup click') // 我们增加了这个
  .combineLatest(responseStream,
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

总结

终于弄完了。下面是全部代码

var refreshButton = document.querySelector('.refresh');
var closeButton1 = document.querySelector('.close1');
var closeButton2 = document.querySelector('.close2');
var closeButton3 = document.querySelector('.close3');

var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
var close2ClickStream = Rx.Observable.fromEvent(closeButton2, 'click');
var close3ClickStream = Rx.Observable.fromEvent(closeButton3, 'click');

var requestStream = refreshClickStream.startWith('startup click')
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    });

var responseStream = requestStream
    .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.getJSON(requestUrl));
    });

function createSuggestionStream(closeClickStream) {
    return closeClickStream.startWith('startup click')
        .combineLatest(responseStream,
            function(click, listUsers) {
                return listUsers[Math.floor(Math.random()*listUsers.length)];
            }
        )
        .merge(
            refreshClickStream.map(function(){ 
                return null;
            })
        )
        .startWith(null);
}

var suggestion1Stream = createSuggestionStream(close1ClickStream);
var suggestion2Stream = createSuggestionStream(close2ClickStream);
var suggestion3Stream = createSuggestionStream(close3ClickStream);


//渲染 ---------------------------------------------------
function renderSuggestion(suggestedUser, selector) {
    var suggestionEl = document.querySelector(selector);
    if (suggestedUser === null) {
        suggestionEl.style.visibility = 'hidden';
    } else {
        suggestionEl.style.visibility = 'visible';
        var usernameEl = suggestionEl.querySelector('.username');
        usernameEl.href = suggestedUser.html_url;
        usernameEl.textContent = suggestedUser.login;
        var imgEl = suggestionEl.querySelector('img');
        imgEl.src = "";
        imgEl.src = suggestedUser.avatar_url;
    }
}

suggestion1Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion1');
});

suggestion2Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion2');
});

suggestion3Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion3');
});

上面的代码非常简短,但是十分紧凑:它体现了使用适当的关注点分离甚至响应捕获可以控制符合的事件。函数化的代码风格使得其看起来像描述而不是编码:我们从来没有给出一系列的执行过程,仅仅是通过定义流之间的关系来描述这是什么。例如,通过FRP,我们告诉计算机suggestion1Stream是“关闭”按钮1的Stream和上一次请求的响应的Stream做组合,当刷新发生或者程序启动的时候会变成null

值得一提的是,上面的代码既没有诸如ifforwhile这类流程控制元素,也没有经典的回调控制流。通过使用subscribe()filter(),你终于可以摆脱ifelse了。

如果你认为“Rx”会成为FRP的首选库,那么花些时间去看看如何用函数来转换、组合和创建Observables。如果你想通过StreamGraph的形式来理解这些函数,你可以访问这个地址。这就是我开发的经验总结:当你使用FRP时有任何疑问,可以先画些图再思考如何解决。
一旦你着手开始用“Rx
”进行编程的时候,非常有必要理解“冷和热的可观察序列”【注:“冷”是指只有订阅者需要的时候才从Stream里面产生一个事件,而且订阅者之间没有任何关联。“热”是指Stream会自动产生事件,而订阅者之间存在关联。而我们上面的例子中,Stream是个“冷”序列】。如果你忽略这个,后面可能会遇到很多奇怪的问题。你已经被提醒了。在日后通过学习真实的函数编程来磨练你的能力,并且理解“Rx”所带来的其他副作用。
不仅仅只有“Rx
”可以实现FRP。还有Bacon.jsElm等。
FRP在富事件的Web前端和移动应用中有着不俗的问题解决能力。但它不仅仅只适用于客户端,它也可在后端工作和访问数据库。事实上,RxJava是Netflix后端服务里面一个非常重要的组件。请记住,FRP不是一个基于某种语言或者应用的框架,它是一种应用于事件驱动的编程范式。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,463评论 25 707
  • 发现 关注 消息 iOS 第三方库、插件、知名博客总结 作者大灰狼的小绵羊哥哥关注 2017.06.26 09:4...
    肇东周阅读 12,019评论 4 62
  • RxJS能够让我们很轻松地创建和操控事件和streams,虽然会让开发变得复杂,但是会让异步代码变得易读。 创建大...
    flyingjimmy阅读 2,377评论 0 0
  • 天阴欲雨 远处靛青的山林和浅墨的云 从此看见美好的风景 我都会想起你
    罗不息阅读 169评论 0 2