RxSwift迁移Combine指南

译自 RxSwift to Combine: The Complete Transition Guide

不逐字翻译了,只翻主要的知识点信息。

简介

Combine 是swift新推出的一种面向 响应式编程的框架。
很多开发者都想从RxSwift 切换到Combine,两者之间有很多的相似之处。
但细节方面还是有很多区别的,如果你想切换过来,这篇文章会列出 RxSwift和Combine之间的 操作符,函数,类型等等的映射。

响应式编程可以让 数据状态在对象之间,工程之间,甚至app之间互相同步。
在ios开发中,最常见的就是在UI和Model之间进行状态同步。在之前很长一段时间里,RxSwift都是最好的选择。但apple推出了 跟SwiftUI 更契合的Combine 作为响应式编程的框架,基本可以完全替代RxSwift的所有功能了。

此文章将分为 Combine是怎么工作的 切换到Combine是好的选择吗 怎么更简单地从RxSwift切换到Combine 三个部分。

RxSwift 和 Combine的主要区别

RxSwift Combine
支持的iOS版本 iOS 8.0+ iOS 13.0+
支持的平台 iOS, macOS, tvOS, watchOS, Linux iOS, macOS, tvOS, watchOS, UIKit for Mac
框架所属 第三方 Apple第一方,SDK内置
谁来维护? 开源社区 Apple技术团队
协作的UI框架 RxCocoa SwiftUI

Combine是怎么工作的

考虑到本文章是介绍如何从RxSwift迁移到Combine, 假设读者已经对RxSwift有一定的了解了。 如果需要了解RxSwift,可以参考 RxSwift repository on GitHub.

Combine在很多方面跟RxSwift都有相似之处以及对等的映射概念映射,比如 方法,类型声明等。
如果要找出两者之间更深层次的差异,需要对Combine进行更深地挖掘,看看它的底层机制。

Publisher

跟RxSwift的Observable 相对应的是Combine里的Publisher。
在RxSwift里是一个类,而在Combine里是一个协议。

protocol Publisher {
    associatedtype Output
    associatedtype Failure: Error
    func receive<S: Subscriber>(subscriber: S) 
        where Self.Failure == S.Failure, 
              Self.Output == S.Input
}

Combine 不会指定 Wrapper types来描述它的唯一特征,比如RxSwift里的Infallible, Maybe or Single。
但每个Publisher也有自己的自定义类型,借由该类型,Publisher的特征也能被推导出来。

RxSwift的Observable 需要实现subscribe 函数,对应的Publisher需要实现 receive函数,它们的功能基本一致。相比于RxSwift而言,Publisher指定了一个 Failure类型,可以表明该publisher 是否/如何 失败。对于那些不用处理失败的Publisher,我们设定Failure类型为Never 即可。

Combine的Publisher 可以是 值类型(比如struct)或是 引用类型(比如类)。大多数的Publisher都是值类型的。

在RxSwift里,操作符一般就是简单地返回一个Observable 类型,而Combine里返回的是一个混合类型,比较复杂;比如Timer.TimerPublisherCombineLatest<Just<Int>, Just<Int>>. 鉴于自己定义一个混合类型比较麻烦,我们可以就用 AnyPublisher 来简化Publisher的使用。
举个例子:AnyPublisher<Int, Never>,可以作为一个 不会失败的抛出Int值的Publisher。

Image

Subscriber

Subscriber (RxSwift: Observer)可以接收订阅信息(subscriptions), 输入数据(inputs),以及结束回调(completions). 这点跟RxSwift不同,没有声明包含了数据回调以及 结束回调的event的枚举,但提供了单独的方法来分别处理对应的事件。

protocol Subscriber: CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

在上面的代码片段里,你可以看到Subscriber 也声明了一个Failure 类型,决定了它能失败,以及失败的相关类型。对应的RxSwift也声明了独立的Error事件。Combine非常明显的表明了 失败会导致Publisher 终结(RxSwift里,error事件也表明了 Observable的终结)。

Subscribers 必须是类,因为 subscription要引用它来做数据传递,而不能在订阅期间重新创建多次Subscribers(值类型比如struct 会在数据变化时会重新创建实例)。

Subscription

每当你订阅时,就会创建一个 Subscription,它会持有所有必要的资源的引用,而且可以随时cancel(对应RxSwift里的DisposeBag)。这也是为什么Subscription 必须是类。

protocol Subscription: Cancellable, ... {
    func request(_ demand: Subscribers.Demand)
}

protocol Cancellable {
    func cancel()
}

与RxSwift相比,Combine允许订阅者随时可以去获取更多数据,并以 backpressure support 的形式返回;因此 实际上是订阅者在主动请求数据而不是 Publisher来决定什么时候将数据传出去。
这能让订阅者不会被大量数据堵塞而导致无法及时处理,这看上去更像是可控制的数据流。

如果想更深入地了解 backpressure support ,可以参考 Processing Published Elements with Subscribers

Subject

Subject 跟RxSwift的差不多,可以看作是 Subscriber 和 Publisher 的组合,既可以发送数据也能接收数据。

protocol Subject: AnyObject, Publisher {
    func send(_ value: Self.Output)
    func send(completion: Subscribers.Completion<Self.Failure>)
    func send(subscription: Subscription)
}

一方面你可以将数据传给 Subject,另一方面由于 Subject 遵从了 Publisher协议,它也可以被监听来获取数据。

在Combine里,订阅者 receive 而 Subject send 事件。尽管如此,在每个subject里,每当value 遵从 Subscriber协议时,你都必须创建一个 AnySubscriber

综上所述,RxSwift和Combine 大部分功能都是一样的,但在接口方面有一些很小的区别。 那么哪个更适合我们呢?

切换到Combine是好的选择吗

最大的争议点在于,Combine是apple的第一方框架,使用它你就不用做一些多余的framework 依赖配置工作,app也体积也会小一些。

当然,RxSwift也没有明确表明是否会有错误发生。一般情况下,直接就用 Observable 然后等着数据过来就行了。 但在Combine里,就明确表明了Publisher 可能会失败,而且失败后产生的错误是啥。

但这些可失败和不会失败的publisher的交互可能会非常复杂,你得在每个使用处都加上 eraseToAnyPublisher(),但帮助也不大。

backpressure support 的帮助下,Combine 能考虑得更加周到。订阅者可以随时获取数据,而不用等待Publisher 来发送数据。这更符合我们的直觉,特别是在一些很耗时的操作发生时,大量的数据同时在publisher 进出。

简单来说,Combine 更适应整个swift的环境。很多扩展方法已经在常用的类型里实现了(比如 Timer.publisher, (0...3).publisher, Optional<String>.none.publisher),这些方法让Combine的功能更加的全面。

不过Combine不像RxSwift那样全面和强大,是么?

RxSwift 没有像Combine 那么多的编译问题。RxSwift不用管 失败的错误类型,而且单个泛型类型的检查比两个要简单得多。

RxSwift支持iOS9以及以上版本,但Combine只能用在iOS13 以及以上版本。它俩在所有的apple 平台上都能work,但Combine对于linux的支持还是有缺失。OpenCombine 可以帮助解决这个问题,它用的是跟Combine一样的接口,且能支持更多平台。

Combine的命名规则更契合iOS平台,而RxSwift 更契合跨平台模式的规则。

RxSwift的技术社区已经存在很长一段时间了,在维护和扩展框架方面已经做了很多改进了。社区成员们为RxSwift 扩展了很多关于特定场景的功能,包括很多自定义的操作符和类型等。而Combine是一个封闭的框架,而且只推出了三年而已,只在少部分的项目里被使用到,如果你需要自己写一些字定义的操作符,还不如直接使用现有的知识点来实现。

现在你要写一个字定义的Publisher比自定义的Observable要麻烦许多。在RxSwift里,你只要用 Observable.create 来发送数据,发送错误,以及完成事件即可。在Combine里却没有对应的操作,你可以用Future (基本上就是一个带completion回调的Publisher,且completion只会调用一次) 或是写一个自定义的Publisher 来模拟 Observable.create 的行为(我们接下来就会讲到这点)

总结一下

回到我们之前的 该用RxSwift还是Combine的问题,这个要看情况的。
如果你想要干掉之前那些“多余的”依赖关系,而且你的app支持iOS13 以及以上版本,且不用支持iOS 以外的平台,那Combine就是比较好的选择了。

怎么更简单地从RxSwift切换到Combine

我们搜集了下面的切换引导信息,你可以直接搜索RxSwift的方法,操作符来找到Combine里的对应替换方案。

一些简单的说明

一般来说,Combine在处理错误时,相关接口看上去比较复杂。给个小提示, setFailureType(to: Failure.self) 更容易理解,而不是当作 mapError { _ -> Failure in } 来看待。

下面的这些链接在我们从RxSwift切换到Combine时提供了很大的帮助。

  • CombineExt 里是Combine技术社区提供的一些 当前系统Combine里没有支持的操作符。
  • cheat sheet 也提供了不少帮助。

类型

Disposable

RxSwift Combine
Disposable Cancellable Use AnyCancellable to create them like RxSwift’s Disposables.create
DisposeBag Set<AnyCancellable> any RandomAccessCollection of AnyCancellable of course, you can also create references one by one

ℹ️ 注意,subscriptions 会在 Cancellable 析构后直接cancel掉。

Publishers

RxSwift Combine
Observable<Element> AnyPublisher<Element, Error>或者 其他的 Publisher 类型
同样根据抛出的错误类型,Combine也能创建对应的Failure类型
Single<Element> AnyPublisher<Element, Error>
Future<Element, Error>
可惜不能保证只抛出一个值,除非使用Future publisher
ConnectableObservable<Element> ConnectablePublisher
Infallible<Element> AnyPublisher<Element, Never>
注意:Failure在这里的类型是 Never
Maybe<Element> 在Combine里没有对应的东东

Subjects

RxSwift Combine
BehaviorSubject CurrentValueSubject
PublishSubject PassthroughSubject
ReplaySubject Combine里没有
👉 CombineExt里的替代方案
AsyncSubject Combine 里没有

Relays

Relay是Subject的一个变种,他跟Subject的不同之处在于 无法发送/接收 完成事件。

class Relay<SubjectType: Subject>: Publisher,
    CustomCombineIdentifierConvertible where SubjectType.Failure == Never {

    typealias Output = SubjectType.Output
    typealias Failure = SubjectType.Failure

    let subject: SubjectType

    init(subject: SubjectType) {
        self.subject = subject
    }

    func send(_ value: Output) {
        subject
            .send(value)
    }

    func receive<S: Subscriber>(subscriber: S) 
        where Failure == S.Failure, Output == S.Input {
        subject
            .subscribe(on: DispatchQueue.main)
            .receive(subscriber: subscriber)
    }

}

typealias CurrentValueRelay<Output> = Relay<CurrentValueSubject<Output, Never>>
typealias PassthroughRelay<Output> = Relay<PassthroughSubject<Output, Never>>

extension Relay {

    convenience init<O>(_ value: O) 
        where SubjectType == CurrentValueSubject<O, Never> {
        self.init(subject: CurrentValueSubject(value))
    }

    convenience init<O>() 
        where SubjectType == PassthroughSubject<O, Never> {
        self.init(subject: PassthroughSubject())
    }

}

Observer Operators

RxSwift Combine
asObserver() AnySubscriber
on(_:) Subscriber
receive(_:)
receive(completion:)
Subject
send(_:)
send(completion:)
onNext(_:) Subscriber
receive(_:)
Subject
send(_:)
onError(_:) Subscriber
receive(completion: .failure(<error>))
Subject
send(completion: .failure(<error>))
onCompleted() Subscriber
receive(completion: .finished)
Subject
send(completion: .finished)
mapObserver(_:) Combine里没有对应的,不过实现起来很简单
👉replacement in CombineExt

如果要修改subscriber的 InputFailure 类型,可以使用下面这个扩展方式

extension Subscriber {

    func map<Input>(
        _ map: @escaping (Input) -> Self.Input
    ) -> AnySubscriber<Input, Failure> {
        .init(
            receiveSubscription: receive,
            receiveValue: { self.receive(map($0)) },
            receiveCompletion: receive
        )
    }

    func mapError<Failure>(
        _ map: @escaping (Failure) -> Self.Failure
    ) -> AnySubscriber<Input, Failure> {
        .init(
            receiveSubscription: receive,
            receiveValue: receive,
            receiveCompletion: { completion in
                switch completion {
                case let .failure(error):
                    self.receive(completion: .failure(map(error)))
                case .finished:
                    self.receive(completion: .finished)
                }
            }
        )
    }
}

Scheduling

RxSwift 有很多不同的Scheduler 类型,比如:MainSchedulerConcurrentDispatchQueueScheduler, SerialDispatchQueueScheduler, CurrentThreadScheduler, HistoricalScheduler, OperationQueueScheduler,以及其他更多类型。

而在Combine里,这个就很简单了。可以直接使用GCD(Grand Central Dispatc)的相关类型就可以了。比如 DispatchQueue, OperationQueue 以及 RunLoop
Combine里唯一的Scheduler类型只有 ImmediateScheduler,它能让任务直接同步执行(跟CurrentThreadScheduler 差不多)。

这篇文章里可以获取更多关于Combine里的scheduling 的信息。

Factory Functions

RxSwift Combine
amb(...) Combine里没有
👉replacement in CombineExt
catch(sequence:) Combine里没有
combineLatest(...) Publishers.CombineLatest
Publishers.CombineLatest3
Publishers.CombineLatest4
Publisher.combineLatest(_:)
注意:你可以把它们串起来使用,然后用map转成特殊的数据类型来使用
concat(...) Publishers.Concatenate
Publisher.append
不支持多于两个值,可以用reduce 来达到目的
create(_:) 单个值的Publisher可以用 Future
多个值的Publisher可以参考 replacement in CombineExt
deferred(_:) Deferred
empty() Empty
error(_:) Fail
from(_:) Collection.publisher
Optional.publisher
generate(
initialState:
condition:
iterate:
)
Combine里没有
just(_:) Just
merge(...) Publishers.Merge
Publishers.Merge3
Publishers.Merge4
Publishers.Merge5
Publishers.Merge6
Publishers.Merge7
Publishers.Merge8
Publishers.MergeMany
never() Empty(completeImmediately: false)
of(...) Collection.publisher
range() Collection.publisher可以用Range或ClosedRange
用 stride
repeatElement(_:) Combine里没有
timer(_:period:scheduler:) Timer.publish
+ delay
+ autoconnect
using(_:observableFactory:) Combine里没有
zip(...) Publishers.Zip
Publishers.Zip3
Publishers.Zip4
👉replacement in CombineExt

Publisher / Observable Operators

RxSwift Combine
amb(_:) Combine里没有
👉 replacement in CombineExt
asCompletable() Combine里没有
asObservable() eraseToAnyPublisher()
buffer(
timeSpan:
count:
scheduler:
)
collect(
.byTimeOrCount(::_:),
options: )
catch(:)
catchError(
:)
catch(_:)
tryCatch(_:)
catchAndReturn(:)
catchErrorJustReturn(
:)
replaceError(with:)
compactMap(_:) compactMap(_:)
tryCompactMap(_:)
concat(...) append
Publishers.Concatenate
concatMap(_:) Combine里没有
可以用 reduce(::)append(_:)来实现
debounce(_:scheduler:) debounce(
for:
scheduler:
options: )
debug(
_:
trimOutput:
file:
line:
function:
)
print(_:to:)
delay(_:scheduler:) delay(
for:
tolerance:
scheduler:
options:
)
delaySubscription(
_:
scheduler:
)
Combine里没有
可以用 Deferreddelay(for:scheduler:options:)的组合来替代
dematerialize() Combine里没有
👉 replacement in CombineExt
distinctUntilChanged(...) removeDuplicates()
当Output是Equatable时用
removeDuplicates(by:)
tryRemoveDuplicates(by:)
do(
onNext:
afterNext:
onError:
afterError:
onCompleted:
afterCompleted:
onSubscribe:
onSubscribed:
onDispose:
)
handleEvents(
receiveSubscription:
receiveOutput:
receiveCompletion:
receiveCancel:
receiveRequest:
)

没有onDispose: 在publisher complete时,cancel不会被调用。
element(at:) output(at:)
enumerated() Combine里没有
建议可以用 scan(::)魔改一下
filter(_:) filter(_:)
tryFilter(_:)
first() first()
flatMapFirst(_:) Combine里没有
flatMap(_:) flatMap(_:)
tryFlatMap(_:)
flatMapLatest(_:) map(_:)接上 switchToLatest()
👉 replacement in CombineExt
groupBy(keySelector:) 没有
ifEmpty(default:) replaceEmpty(with:)
ifEmpty(switchTo:) 没有
ignoreElements() ignoreOutput()
map(_:) map(_:)
tryMap(_:)
materialize() 没有
👉 replacement in CombineExt
multicast(_:) multicast(subject:)
multicast(makeSubject:) multicast(_:)
observe(on:) observe(on:)
Combine用的是不同的scheduler类型!
publish() multicast { PassthroughSubject() }
makeConnectable()
reduce(into:_:) 没有
不过可以用 reduce 来实现
reduce(::) reduce(::)
tryReduce(::)
reduce(
_:
accumulator:
mapResult:
)
reduce(::)接上 map(_:)tryMap(_:)
refCount() autoconnect()
replay(_:) 没有
可以用 multicast(_:)和 ReplaySubject 来实现
replayAll() 没有
retry() retry(.max)
可能有微小的意义上的差别,但实际使用中一般没区别
retry(_:) retry(_:)
retry(when:) 没有
sample(_:defaultValue:) 没有
scan(into:_:) 没有
可以用一般的 scan(::)来实现
scan(::) scan(::)
tryScan(::)
share(replay:scope:) share()
或者用 multicast(_:)加上 PassthroughSubjectautoconnect()
或者用 multicast(_:)加上 ReplaySubject
👉 extension in CombineExt
single() 没有
当有且只有一个值时,用 first(),否则会报错
single(_:) 没有
可以用 filter(_:)接上 single()的替代方案
skip(_:) dropFirst(_:)
skip(while:) drop(while:)
skip(until:) drop(untilOutputFrom:)
startWith(...) prepend(...)
subscribe(_:) 最类似的接口:
sink(
receiveValue:
receiveCompletion:
)
subscribe(on:) subscribe(on:options:)
Combine用的是不同的scheduler类型!
subscribe(
onNext:
onError:
onCompleted:
onDisposed:
)
最类似的接口:
sink(
receiveValue:
receiveCompletion:
)
subscribe(
with:
onNext:
onError:
onCompleted:
onDisposed:
)
最类似的接口:
sink(
receiveValue:
receiveCompletion:
)
[weak object]
switchLatest() switchToLatest()
take(_:) prefix(_:)
take(for:scheduler:) 没有
可以用以下方式解决:
prefix(untilOutputFrom:
Timer.publish(
every: <time>,
on: <scheduler> )
.autoconnect()
.prefix(1)
)

👉 replacement in CombineExt
take(until:) prefix(untilOutputFrom:)
take(until:behavior:) prefix(while:)的相反条件来替代,但没有 behavior参数功能了
take(while:behavior:) prefix(while:),但没有 behavior参数功能了
takeLast(_:) 没有
最接近的方案:
reduce([]) { 0 + [1] }
.flatMap { $0.suffix(<count>).publisher
}
throttle(
_:
latest:
scheduler:
)
throttle(
for:
scheduler:
latest:
)
timeout(
_:
other:
scheduler:
)
最接近的方案:
timeout(
_:
scheduler:
options:
customError:
)

然后再 catch(_:), 用map转到其他publisher上
timeout(_:scheduler:) timeout(
_:
scheduler:
options:
customError:
)
toArray() collect()
window(
timeSpan:
count:
scheduler:
)
collect(
.byTime(<scheduler>, <time>)
)


collect(
.byTimeOrCount(
<scheduler>,
<time>,
<count>
)
)
withLatestFrom(_:) 没有
👉 replacement in CombineExt
withLatestFrom(
_:
resultSelector:
)
没有
👉 replacement in CombineExt
withUnretained(_:) 最接近的方案:
compactMap { [weak object] value in
object.map { ($0, value) }
}
withUnretained(
_:
resultSelector:
)
最接近的方案:
compactMap { [weak object] value in
object.map {
resultSelector($0, value)
}
}

总结一下

对我们来说,总是要调用 setFailureType(to:)eraseToAnyPublisher() 感觉不舒坦,除此之外,我们还是觉得Combine 很棒的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容