说明:
Filters the elements of an observable sequence based on a predicate
代码如下:
extension ObservableType {
public func filter(predicate: (E) throws ->Bool)->Observable<E> {
return Filter(source: asObservable(), predicate: predicate)
}
}
class Filter<Element>: Producer<Element> {
typealias Predicate = (Element) throws->Bool
private let _source: Observable<Element>
private let _predicate: Predicate
init(source: Observable<Element>, predicate: Predicate) {
_source = source
_predicate = predicate
}
override func run<O: ObserverType where O.E == Element>(observer: O)->Disposable {
let sink = FilterSink(predicate: _predicate, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
class FilterSink<O: ObserverType>: Sink<O>, ObserverType {
typealias Predicate = (Element) throws ->Bool
typealias Element = O.E
typealias Parent = Filter<Element>
private let _predicate: Predicate
init(predicate: Predicate, observer: O) {
_predicate = predicate
super.init(observer: observer)
}
func on(event: Event<Element>) {
switch event {
case .Next(let value):
do {
let satisfies = try _predicate(value)
if satisfies {
forwardOn(.Next(value))
}
}catch let e {
forwardOn(.Error(e))
dispose()
}
case .Completed, .Error:
forwardOn(event)
dispose()
}
}
}