Flutter 学习之旅(四十六) Rxdart 学习

本来这张想继续说一下flutter_bloc 这个框架,这里面会涉及到 Rxdart 一些东西,如果不说一下没有连贯性,看起代码来特别牵强,由于我也是现学现卖,所以我所描述的Rxdart 的 api 基本都是flutter_bloc 所用到的,在介绍的同时 ,同时会说一下部分的源码,让大家看起来更直接

这里我使用的是 rxdart: ^0.24.1 这个版本的

我从网上看了很多关于 rxdart 的文章,里面多少都有提到 Observable 的说明,但是我在使用的时候找不到这个类,分别引入了原来的0.18.1的版本,还有新的0.24.1 这个版本,发现在新的版本当中其实是没有Observable 这个类的,那么所有的功能又回归到Stream 了,
但是在Rxdart中Subject 这个类相当于controller 的功能, 既包含 Stream 这个事件源 ,也包含Subscription 这个管理对象,

我对于rxdart理解其实就是对Stream的转换,那么既然是转换,使用Stream 和Subject 他们有什么区别呢

我们先来看一下Stream 的简单应用

  List<int> array = [1, 2, 3, 4, 5, 6, 7];
  Stream<int> stream=Stream.fromIterable(array).asBroadcastStream().map((event) => event*event);
  stream.interval(Duration(seconds: 1)).listen((event) {
    printString('listen1:$event');
  });
  stream.listen((event) { printString('listen2:$event');});

打印结果

I/flutter (25410): tian.shm =listen2:1
I/flutter (25410): tian.shm =listen2:4
I/flutter (25410): tian.shm =listen2:9
I/flutter (25410): tian.shm =listen2:16
I/flutter (25410): tian.shm =listen2:25
I/flutter (25410): tian.shm =listen2:36
I/flutter (25410): tian.shm =listen2:49
I/flutter (25410): tian.shm =listen1:1
I/flutter (25410): tian.shm =listen1:4
I/flutter (25410): tian.shm =listen1:9
I/flutter (25410): tian.shm =listen1:16
I/flutter (25410): tian.shm =listen1:25
I/flutter (25410): tian.shm =listen1:36
I/flutter (25410): tian.shm =listen1:49

很简单的应用,创建一个Stream,并将它置位广播, 再对数据做一下简单的操作, 第一个listen 是 每一秒打印一个日志,第二个listen 则是直接打印日志,
那么我们介绍这个方法有什么意义吗,
这里主要是为了说明Stream的一些静态方法,就从这个Stream.fromIterable 来说,他是同步还是异步? 既然可以监听,那么我可以在后续添加数据吗,还有是否可以多次监听,这个在上面已经体现了,既然是Stream ,那么只有BroadCastStream 可以被多次监听的,所以如果想要多次监听,所有在添加数据后必须使用asBroadcastStream()后才可以

那么剩下的就来先看一下Stream.fromIterable 这个方法他到底是同步还是异步的,

factory Stream.fromIterable(Iterable<T> elements) {
    return new _GeneratedStreamImpl<T>(
        () => new _IterablePendingEvents<T>(elements));
}

class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
  final _EventGenerator<T> _pending;
  bool _isUsed = false;
  _GeneratedStreamImpl(this._pending);

  StreamSubscription<T> _createSubscription(void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError) {
    if (_isUsed) throw new StateError("Stream has already been listened to.");
    _isUsed = true;
    return new _BufferingStreamSubscription<T>(
        onData, onError, onDone, cancelOnError)
      .._setPendingEvents(_pending());
  }
}

看到这个_BufferingStreamSubscription类大家应该很不陌生, 他维护了一下消息队列,并提供了一个相当于沙盒的运行环境, 对于是否是异步则看这个消息是怎么处理,那么只要看一下_IterablePendingEvents这个类是否维护了 Microtask ,下面我们来看一下源码,

 void handleNext(_EventDispatch<T> dispatch) {
    var iterator = _iterator;
    if (iterator == null) {
      throw new StateError("No events pending.");
    }
    bool movedNext = false;
    try {
      if (iterator.moveNext()) {
        movedNext = true;
        dispatch._sendData(iterator.current);
      } else {
        _iterator = null;
        dispatch._sendDone();
      }
    } catch (e, s) {
      if (!movedNext) {
      }
      dispatch._sendError(e, s);
    }
  }

我们看到_IterablePendingEvents这个类中只是维护了一下这个队列,并么有异步的过程,也就是说是否是异步需要只要大家看一下实现这个消息传递的event 就可以了
在学习Stream 的过程中,我们知道添加数据的过程中使用的是Sink调用controller 来添加数据,最后使用的是_BufferingStreamSubscription来管理这个队列的,而我们得到的只是Stream,并没有添加数据的功能,所以这就是直接使用Stream 相对于Subject 的缺陷,

PublishSubject

PublishSubject<int> publishSubject=PublishSubject<int>();
  publishSubject.add(1);
  publishSubject.add(2);
  publishSubject.add(3);
  publishSubject.interval(Duration(seconds: 1)).listen((value) {
    printString('listen1:$value');
  });
  publishSubject.add(4);
  publishSubject.listen((value) {
    printString('listen2:$value');
  });
  publishSubject.add(5);
  publishSubject.add(6);
  publishSubject.add(7);

结果

I/flutter (27171): tian.shm =listen2:5
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7

从这里可以看出来 先添加进去的数据如果没有监听那么这个消息就被消费掉了,至于为什么,先来说一下PublishSubject他的controller,

class PublishSubject<T> extends Subject<T> {
  PublishSubject._(StreamController<T> controller, Stream<T> stream)
      : super(controller, stream);
  factory PublishSubject(
      {void Function() onListen, void Function() onCancel, bool sync = false}) {
    final controller = StreamController<T>.broadcast(
      onListen: onListen,
      onCancel: onCancel,
      sync: sync,
    );
    return PublishSubject<T>._(
      controller,
      controller.stream,
    );
  }
}

使用的是_AsyncBroadcastStreamController,默认是异步的, 记得在说BroadcastStreamController 的时候说过,每次调用listen 的时候则生成一个subscription,消息转发的时候遍历subscription

  void _sendData(T data) {
    for (var subscription = _firstSubscription;
        subscription != null;
        subscription = subscription._next) {
      subscription._addPending(new _DelayedData<T>(data));
    }
  }

从这里可以看到,如果没有添加监听,这个消息就浪费了,

BehaviorSubject

同样使用上面的代码,将PublishSubject 变成BehaviorSubject ,

  BehaviorSubject<int> publishSubject=BehaviorSubject<int>();
  publishSubject.add(1);
  publishSubject.add(2);
  publishSubject.add(3);
  publishSubject.interval(Duration(seconds: 1)).listen((value) {
    printString('listen1:$value');
  });
  publishSubject.add(4);
  publishSubject.listen((value) {
    printString('listen2:$value');
  });
  publishSubject.add(5);
  publishSubject.add(6);
  publishSubject.add(7);

打印结果,

I/flutter (27171): tian.shm =listen2:4
I/flutter (27171): tian.shm =listen2:5
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:3
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7

相对于使用 PublishSubject ,打印结果都多打印了添加监听之前的一个数据,至于为什么,我们来看一下,

  factory BehaviorSubject({
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
  }) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
      onListen: onListen,
      onCancel: onCancel,
      sync: sync,
    );

    final wrapper = _Wrapper<T>();

    return BehaviorSubject<T>._(
        controller,
        Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
        wrapper);
  }

在创建BehaviorSubject的时候,同时初始化了一个_Wrapper这个对象,而这个对象是什么呢,

class _Wrapper<T> {
  T latestValue;
  Object latestError;
  StackTrace latestStackTrace;
  bool latestIsValue = false, latestIsError = false;
  _Wrapper();
  _Wrapper.seeded(this.latestValue) : latestIsValue = true;
  void setValue(T event) {
    latestIsValue = true;
    latestIsError = false;
    latestValue = event;
    latestError = null;
    latestStackTrace = null;
  }
  void setError(Object error, [StackTrace stackTrace]) {
    latestIsValue = false;
    latestIsError = true;
    latestValue = null;
    latestError = error;
    latestStackTrace = stackTrace;
  }
}

这个_Wrapper保存了一些操作的数据,那么什么时候对这个数据做操作呢,

class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
  ...
  @override
  void onAdd(T event) => _wrapper.setValue(event);
  ...
}

每次添加数据的时候,保存添加的数据,也就是说,多打印的这个数据就是这个_Wrapper保存的数据,
既然数据保存了,而Stream 对于数据时不可以操作的,那么如何将保存的数据添加到原有监听呢,这里就归功于BehaviorSubject他的Stream的延迟创建的过程Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),每次添加监听后重新创建这个Stream,将这个缓存的数据重新添加入Stream 中,至于是怎么实现的,来看一下源码,

class DeferStream<T> extends Stream<T> {
  final Stream<T> Function() _factory;
  final bool _isReusable;

  @override
  bool get isBroadcast => _isReusable;

  /// Constructs a [Stream] lazily, at the moment of subscription, using
  /// the [streamFactory]
  DeferStream(Stream<T> Function() streamFactory, {bool reusable = false})
      : _isReusable = reusable,
        _factory = reusable
            ? streamFactory
            : (() {
                Stream<T> stream;
                return () => stream ??= streamFactory();
              }());
  @override
  StreamSubscription<T> listen(void Function(T event) onData,
          {Function onError, void Function() onDone, bool cancelOnError}) =>
      _factory().listen(onData,
          onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}

入参就是是传入一个构造stream 的方法,每次监听的时候则调用该方法重新创建Stream ,这样就可以将默认_Wrapper缓存的数据添加到Stream,从而实现每次缓存一个数据

ReplaySubject

看过了BehaviorSubject,后再看ReplaySubject后就比较简单了,就是将_Wrapper 变成了一个队列,这个队列可以设置数据个数的上限,再添加监听后,如果超过上限则删除第一个后再将新的数据添加入这个队列中,

添加数据的方法,

  @override
  void onAdd(T event) {
    if (_queue.length == _maxSize) {
      _queue.removeFirst();
    }

    _queue.add(_Event(false, event: event));
  }

Stream重新创建的方法,根据队列重新创建Stream

Rx.defer<T>(
        () => queue.toList(growable: false).reversed.fold(controller.stream,
            (stream, event) {
          if (event.isError) {
            return stream.transform(StartWithErrorStreamTransformer(
                event.errorAndStackTrace.error,
                event.errorAndStackTrace.stackTrace,
                sync));
          } else {
            return stream
                .transform(StartWithStreamTransformer(event.event, sync: sync));
          }
        }),
        reusable: true,
      )

我们再来一个简单的例子

  ReplaySubject<int> publishSubject=ReplaySubject<int>();
  publishSubject.add(1);
  publishSubject.add(2);
  publishSubject.add(3);
  publishSubject.interval(Duration(seconds: 1)).listen((value) {
    printString('listen1:$value');
  });
  publishSubject.add(4);
  publishSubject.listen((value) {
    printString('listen2:$value');
  });
  publishSubject.add(5);
  publishSubject.add(6);
  publishSubject.add(7);

打印结果,

I/flutter (27171): tian.shm =listen2:1
I/flutter (27171): tian.shm =listen2:2
I/flutter (27171): tian.shm =listen2:3
I/flutter (27171): tian.shm =listen2:4
I/flutter (27171): tian.shm =listen2:5
Reloaded 4 of 966 libraries in 726ms.
I/flutter (27171): tian.shm =listen2:6
I/flutter (27171): tian.shm =listen2:7
I/flutter (27171): tian.shm =listen1:1
I/flutter (27171): tian.shm =listen1:2
I/flutter (27171): tian.shm =listen1:3
I/flutter (27171): tian.shm =listen1:4
I/flutter (27171): tian.shm =listen1:5
I/flutter (27171): tian.shm =listen1:6
I/flutter (27171): tian.shm =listen1:7

如果给ReplaySubject 设置一个最大值,效果大家肯定能想象的到,这里就不多做介绍了,

关于rxdart的操作符,如果像我这么讲起来,感觉再写个几十篇都不行,因为每一个操作符都有他的意义,就不多做介绍了

我学习flutter的整个过程都记录在里面了
https://www.jianshu.com/c/36554cb4c804

最后附上demo 地址

https://github.com/tsm19911014/tsm_flutter

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343