文章系列:
Async Squences/Streams
在介绍Concurrency Framework中的Async Squences/Streams,我们先回顾一下swift的集合中的 Sequence和Iterators。
Swift集合中的Sqeuence
swift集合中的Sequence是一系列相同类型值的集合,并提供了对这些值的迭代能力。
for element in someSequence {
doSomething(with: element)
}
Sequence 协议的定义:
protocol Sequence {
associatedtype Iterator: IteratorProtocol
func makeIterator() -> Iterator
}
Sequence 协议需要实现makeIterator方法,并返回一个Iterator,Iterator遵循IteratorProtocol:
public protocol IteratorProtocol {
associatedtype Element
public mutating func next() -> Self.Element?
}
IteratorProtocol需要实现next方法,返回存储的值对象。当没有下一个元素返回nil。
我们以下载一系列的url的任务为例子,使用同步Squence方式:
struct RemoteDataSequence: Sequence {
var urls: [URL]
func makeIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
为了返回数据,我们需要实现RemoteDataIterator类型,设计上我们使用index
跟踪下一个待下载的urls数组索引
struct RemoteDataIterator: IteratorProtocol {
var urls: [URL]
fileprivate var index = 0
mutating func next() -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
// If a download fails, we simply move on to
// the next URL in this case:
guard let data = try? Data(contentsOf: url) else {
return next()
}
return data
}
}
我们现在可以通过for循环来遍历访问下载的所有图片数据
for data in RemoteDataSequence(urls: urls) {
...
}
虽然我们通过Sqeuence实现了一个简洁的批量下载器,但是批量下载使用同步的方式显然比较难于接受,这样会完全阻塞线程。接下来我们通过使用asynchronous sequence来达到我们的要求。
Asynchronous iterations
Swift 5.5中Concurrency为了方便并行任务的开发,提供了AsyncSequence,使用方式类似同步版本的Sequence。针对批量下载器我们可以这样改造一下:
struct RemoteDataSequence: AsyncSequence {
typealias Element = Data
var urls: [URL]
func makeAsyncIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
AsyncSequence重要实现其实是在RemoteDataIterator中,Concurrency为RemoteDataIterator的next
返回方法添加了async
。通过URLSession
的async-wait API,我们可以异步下载我们的数据:
struct RemoteDataIterator: AsyncIteratorProtocol {
var urls: [URL]
fileprivate var urlSession = URLSession.shared
fileprivate var index = 0
mutating func next() async throws -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
通过AsyncSequence
的改造,现在我们的批量下载器已经可以全异步执行,不过在我们访问数据时还是需要调用await
和try
,数据会通过后台线程下载并允许我们使用for循环来遍历访问
for try await data in RemoteDataSequence(urls: urls) {
...
}
在for循环中,如果一个步骤抛出了异常则循环会中止,这样有利于简化异常捕获的处理。如果不想要异常导致循环中断,也可以实现无异常的方法。
Asynchronous streams
通过实现AsyncIteratorProtocol有时候还是稍嫌麻烦(需要自定义 AsyncIteratorProtocol 的类型),Concurrency提供了AsyncStream
和AsyncThrowingStream
。在AsyncStream
和AsyncThrowingStream
构造闭包函数中,需要使用Task
来执行异步任务,使用yield
方法来返回数据,同时调用finish
来告知是否存在异常。上面的例子可以改造为:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for url in urls {
let (data, _) = try await urlSession.data(from: url)
continuation.yield(data)
}
continuation.finish(throwing: nil)
} catch {
continuation.finish(throwing: error)
}
}
}
}
现在我们可以同样使用for来遍历我们的下载数据:
for try await data in remoteDataStream(forURLs: urls) {
...
}
AsyncStream
和AsyncThrowingStream
可以认为是AsyncSequence
协议的具体实现,相当于Array
是Sequence
的具体实现。在开发中使用stream可以简化我们的异步程序编写。
在Apple的响应式框架Combine
也提供了对AsyncSequence
的兼容,可以轻松地将任何publisher
都转换为AsyncSequence
的值对象。上面的下载器可以使用Combine来改写:
func remoteDataPublisher(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
urls.publisher
.setFailureType(to: URLError.self)
.flatMap(maxPublishers: .max(1)) {
urlSession.dataTaskPublisher(for: $0)
}
.map(\.data)
.eraseToAnyPublisher()
}
将AnyPublisher
转换为AsyncSequence
,我们只需要访问publisher的values属性:
let publisher = remoteDataPublisher(forURLs: urls)
for try await data in publisher.values {
...
}