Swift Concurrency框架之Async Squences/Streams

文章系列:

Async Squences/Streams

在介绍Concurrency Framework中的Async Squences/Streams,我们先回顾一下swift的集合中的 Sequence和Iterators。

Swift集合中的Sqeuence

swift集合中的Sequence是一系列相同类型值的集合,并提供了对这些值的迭代能力。

for element in someSequence {
    doSomething(with: element)
}

Sequence 协议的定义:

protocol Sequence {
    associatedtype Iterator: IteratorProtocol
    func makeIterator() -> Iterator
}

Sequence 协议需要实现makeIterator方法,并返回一个Iterator,Iterator遵循IteratorProtocol:

public protocol IteratorProtocol {
    associatedtype Element
    public mutating func next() -> Self.Element?
}

IteratorProtocol需要实现next方法,返回存储的值对象。当没有下一个元素返回nil。
我们以下载一系列的url的任务为例子,使用同步Squence方式:

struct RemoteDataSequence: Sequence {
    var urls: [URL]

    func makeIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

为了返回数据,我们需要实现RemoteDataIterator类型,设计上我们使用index跟踪下一个待下载的urls数组索引

struct RemoteDataIterator: IteratorProtocol {
    var urls: [URL]
    fileprivate var index = 0

    mutating func next() -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        // If a download fails, we simply move on to
        // the next URL in this case:
        guard let data = try? Data(contentsOf: url) else {
            return next()
        }

        return data
    }
}

我们现在可以通过for循环来遍历访问下载的所有图片数据

for data in RemoteDataSequence(urls: urls) {
    ...
}

虽然我们通过Sqeuence实现了一个简洁的批量下载器,但是批量下载使用同步的方式显然比较难于接受,这样会完全阻塞线程。接下来我们通过使用asynchronous sequence来达到我们的要求。

Asynchronous iterations

Swift 5.5中Concurrency为了方便并行任务的开发,提供了AsyncSequence,使用方式类似同步版本的Sequence。针对批量下载器我们可以这样改造一下:

struct RemoteDataSequence: AsyncSequence {
    typealias Element = Data

    var urls: [URL]

    func makeAsyncIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

AsyncSequence重要实现其实是在RemoteDataIterator中,Concurrency为RemoteDataIterator的next返回方法添加了async。通过URLSession的async-wait API,我们可以异步下载我们的数据:

struct RemoteDataIterator: AsyncIteratorProtocol {
    var urls: [URL]
    fileprivate var urlSession = URLSession.shared
    fileprivate var index = 0

    mutating func next() async throws -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

通过AsyncSequence的改造,现在我们的批量下载器已经可以全异步执行,不过在我们访问数据时还是需要调用awaittry,数据会通过后台线程下载并允许我们使用for循环来遍历访问

for try await data in RemoteDataSequence(urls: urls) {
    ...
}

在for循环中,如果一个步骤抛出了异常则循环会中止,这样有利于简化异常捕获的处理。如果不想要异常导致循环中断,也可以实现无异常的方法。

Asynchronous streams

通过实现AsyncIteratorProtocol有时候还是稍嫌麻烦(需要自定义 AsyncIteratorProtocol 的类型),Concurrency提供了AsyncStreamAsyncThrowingStream。在AsyncStreamAsyncThrowingStream构造闭包函数中,需要使用Task来执行异步任务,使用yield方法来返回数据,同时调用finish来告知是否存在异常。上面的例子可以改造为:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream { continuation in
        Task {
            do {
                for url in urls {
                    let (data, _) = try await urlSession.data(from: url)
                    continuation.yield(data)
                }

                continuation.finish(throwing: nil)
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

现在我们可以同样使用for来遍历我们的下载数据:

for try await data in remoteDataStream(forURLs: urls) {
    ...
}

AsyncStreamAsyncThrowingStream可以认为是AsyncSequence协议的具体实现,相当于ArraySequence的具体实现。在开发中使用stream可以简化我们的异步程序编写。
在Apple的响应式框架Combine也提供了对AsyncSequence的兼容,可以轻松地将任何publisher都转换为AsyncSequence的值对象。上面的下载器可以使用Combine来改写:

func remoteDataPublisher(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
    urls.publisher
        .setFailureType(to: URLError.self)
        .flatMap(maxPublishers: .max(1)) {
            urlSession.dataTaskPublisher(for: $0)
        }
        .map(\.data)
        .eraseToAnyPublisher()
}

AnyPublisher转换为AsyncSequence,我们只需要访问publisher的values属性:

let publisher = remoteDataPublisher(forURLs: urls)

for try await data in publisher.values {
    ...
}

文章参考: async sequences streams and combine

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容