从RxDart
中截出
新增文件,把下面代码导入即可
import 'dart:async';
import 'dart:collection';
/// The strategy that is used to determine how and when a new window is created.
enum WindowStrategy {
/// cancels the open window (if any) and immediately opens a fresh one.
everyEvent,
/// waits until the current open window completes, then when the
/// source [Stream] emits a next event, it opens a new window.
eventAfterLastWindow,
/// opens a recurring window right after the very first event on
/// the source [Stream] is emitted.
firstEventOnly,
/// does not open any windows, rather all events are buffered and emitted
/// whenever the handler triggers, after this trigger, the buffer is cleared.
onHandler
}
class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
final WindowStrategy _strategy;
final Stream<dynamic> Function(S event) _windowStreamFactory;
final T Function(S event) _onWindowStart;
final T Function(List<S> queue) _onWindowEnd;
final int _startBufferEvery;
final bool Function(List<S> queue) _closeWindowWhen;
final bool _ignoreEmptyWindows;
final bool _dispatchOnClose;
final Queue<S> queue = DoubleLinkedQueue<S>();
final int maxLengthQueue;
var skip = 0;
var _hasData = false;
var _mainClosed = false;
StreamSubscription<dynamic> _windowSubscription;
_BackpressureStreamSink(
this._strategy,
this._windowStreamFactory,
this._onWindowStart,
this._onWindowEnd,
this._startBufferEvery,
this._closeWindowWhen,
this._ignoreEmptyWindows,
this._dispatchOnClose,
this.maxLengthQueue,
);
@override
void add(EventSink<T> sink, S data) {
_hasData = true;
maybeCreateWindow(data, sink);
if (skip == 0) {
queue.add(data);
if (maxLengthQueue != null && queue.length > maxLengthQueue) {
queue.removeFirstElements(queue.length - maxLengthQueue);
}
}
if (skip > 0) {
skip--;
}
maybeCloseWindow(sink);
}
@override
void addError(EventSink<T> sink, Object e, StackTrace st) =>
sink.addError(e, st);
@override
void close(EventSink<T> sink) {
_mainClosed = true;
if (_strategy == WindowStrategy.eventAfterLastWindow) {
return;
}
// treat the final event as a Window that opens
// and immediately closes again
if (_dispatchOnClose && queue.isNotEmpty) {
resolveWindowStart(queue.last, sink);
}
resolveWindowEnd(sink, true);
queue.clear();
_windowSubscription?.cancel();
sink.close();
}
@override
FutureOr onCancel(EventSink<T> sink) => _windowSubscription?.cancel();
@override
void onListen(EventSink<T> sink) {}
@override
void onPause(EventSink<T> sink) => _windowSubscription?.pause();
@override
void onResume(EventSink<T> sink) => _windowSubscription?.resume();
void maybeCreateWindow(S event, EventSink<T> sink) {
switch (_strategy) {
// for example throttle
case WindowStrategy.eventAfterLastWindow:
if (_windowSubscription != null) return;
_windowSubscription = singleWindow(event, sink);
resolveWindowStart(event, sink);
break;
// for example scan
case WindowStrategy.firstEventOnly:
if (_windowSubscription != null) return;
_windowSubscription = multiWindow(event, sink);
resolveWindowStart(event, sink);
break;
// for example debounce
case WindowStrategy.everyEvent:
_windowSubscription?.cancel();
_windowSubscription = singleWindow(event, sink);
resolveWindowStart(event, sink);
break;
case WindowStrategy.onHandler:
break;
}
}
void maybeCloseWindow(EventSink<T> sink) {
if (_closeWindowWhen != null && _closeWindowWhen(unmodifiableQueue)) {
resolveWindowEnd(sink);
}
}
StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
buildStream(event, sink).take(1).listen(
null,
onError: sink.addError,
onDone: () => resolveWindowEnd(sink, _mainClosed),
);
// opens a new Window which is kept open until the main Stream
// closes.
StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
buildStream(event, sink).listen(
(dynamic _) => resolveWindowEnd(sink),
onError: sink.addError,
onDone: () => resolveWindowEnd(sink),
);
Stream<dynamic> buildStream(S event, EventSink<T> sink) {
Stream stream;
_windowSubscription?.cancel();
stream = _windowStreamFactory(event);
return stream;
}
void resolveWindowStart(S event, EventSink<T> sink) {
if (_onWindowStart != null) {
sink.add(_onWindowStart(event));
}
}
void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
if (isControllerClosing &&
_strategy == WindowStrategy.eventAfterLastWindow) {
if (_dispatchOnClose &&
_hasData &&
queue.length > 1 &&
_onWindowEnd != null) {
sink.add(_onWindowEnd(unmodifiableQueue));
}
queue.clear();
_windowSubscription?.cancel();
_windowSubscription = null;
sink.close();
return;
}
if (isControllerClosing ||
_strategy == WindowStrategy.eventAfterLastWindow ||
_strategy == WindowStrategy.everyEvent) {
_windowSubscription?.cancel();
_windowSubscription = null;
}
if (isControllerClosing && !_dispatchOnClose) {
return;
}
if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) {
if (_onWindowEnd != null) {
sink.add(_onWindowEnd(unmodifiableQueue));
}
// prepare the buffer for the next window.
// by default, this is just a cleared buffer
if (!isControllerClosing && _startBufferEvery > 0) {
skip = _startBufferEvery > queue.length
? _startBufferEvery - queue.length
: 0;
// ...unless startBufferEvery is provided.
// here we backtrack to the first event of the last buffer
// and count forward using startBufferEvery until we reach
// the next event.
//
// if the next event is found inside the current buffer,
// then this event and any later events in the buffer
// become the starting values of the next buffer.
// if the next event is not yet available, then a skip
// count is calculated.
// this count will skip the next Future n-events.
// when skip is reset to 0, then we start adding events
// again into the new buffer.
//
// example:
// startBufferEvery = 2
// last buffer: [0, 1, 2, 3, 4]
// 0 is the first event,
// 2 is the n-th event
// new buffer starts with [2, 3, 4]
//
// example:
// startBufferEvery = 3
// last buffer: [0, 1]
// 0 is the first event,
// the n-the event is not yet dispatched at this point
// skip becomes 1
// event 2 is skipped, skip becomes 0
// event 3 is now added to the buffer
if (_startBufferEvery < queue.length) {
queue.removeFirstElements(_startBufferEvery);
} else {
queue.clear();
}
} else {
queue.clear();
}
}
}
List<S> get unmodifiableQueue => List<S>.unmodifiable(queue);
}
/// A highly customizable [StreamTransformer] which can be configured
/// to serve any of the common rx backpressure operators.
///
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue].
///
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue]. It uses a [WindowStrategy] to determine
/// how and when a new window is created.
///
/// onWindowStart and onWindowEnd are handlers that fire when a window
/// opens and closes, right before emitting the transformed event.
///
/// startBufferEvery allows to skip events coming from the source [Stream].
///
/// ignoreEmptyWindows can be set to true, to allow events to be emitted
/// at the end of a window, even if the current buffer is empty.
/// If the buffer is empty, then an empty [List] will be emitted.
/// If false, then nothing is emitted on an empty buffer.
///
/// dispatchOnClose will cause the remaining values in the buffer to be
/// emitted when the source [Stream] closes.
/// When false, the remaining buffer is discarded on close.
class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
/// Determines how the window is created
final WindowStrategy strategy;
/// Factory method used to create the [Stream] which will be buffered
final Stream<dynamic> Function(S event) windowStreamFactory;
/// Handler which fires when the window opens
final T Function(S event) onWindowStart;
/// Handler which fires when the window closes
final T Function(List<S> queue) onWindowEnd;
/// Maximum length of the buffer.
/// Specify this value to avoid running out of memory when adding too many events to the buffer.
/// If it's `null`, maximum length of the buffer is unlimited.
final int maxLengthQueue;
/// Used to skip an amount of events
final int startBufferEvery;
/// Predicate which determines when the current window should close
final bool Function(List<S> queue) closeWindowWhen;
/// Toggle to prevent, or allow windows that contain
/// no events to be dispatched
final bool ignoreEmptyWindows;
/// Toggle to prevent, or allow the final set of events to be dispatched
/// when the source [Stream] closes
final bool dispatchOnClose;
/// Constructs a [StreamTransformer] which buffers events emitted by the
/// [Stream] that is created by [windowStreamFactory].
///
/// Use the various optional parameters to precisely determine how and when
/// this buffer should be created.
///
/// For more info on the parameters, see [BackpressureStreamTransformer],
/// or see the various back pressure [StreamTransformer]s for examples.
BackpressureStreamTransformer(
this.strategy,
this.windowStreamFactory, {
this.onWindowStart,
this.onWindowEnd,
this.startBufferEvery = 0,
this.closeWindowWhen,
this.ignoreEmptyWindows = true,
this.dispatchOnClose = true,
this.maxLengthQueue,
});
@override
Stream<T> bind(Stream<S> stream) {
final sink = _BackpressureStreamSink(
strategy,
windowStreamFactory,
onWindowStart,
onWindowEnd,
startBufferEvery,
closeWindowWhen,
ignoreEmptyWindows,
dispatchOnClose,
maxLengthQueue,
);
return forwardStream(stream, sink);
}
}
extension _RemoveFirstNQueueExtension<T> on Queue<T> {
/// Removes the first [count] elements of this queue.
void removeFirstElements(int count) {
for (var i = 0; i < count; i++) {
removeFirst();
}
}
}
class ThrottleStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
/// Construct a [StreamTransformer] that emits a value from the source [Stream],
/// then ignores subsequent source values while the window [Stream] is open,
/// then repeats this process.
///
/// If [leading] is true, then the first item in each window is emitted.
/// If [trailing] is true, then the last item in each window is emitted.
ThrottleStreamTransformer(
Stream Function(T event) window, {
bool trailing = false,
bool leading = true,
}) : super(
WindowStrategy.eventAfterLastWindow,
window,
onWindowStart: leading ? (event) => event : null,
onWindowEnd: trailing ? (queue) => queue.last : null,
dispatchOnClose: trailing,
maxLengthQueue: trailing ? 2 : 0,
);
}
class TimerStream<T> extends Stream<T> {
final StreamController<T> _controller;
/// Constructs a [Stream] which emits [value] after the specified [Duration].
TimerStream(T value, Duration duration)
: _controller = _buildController(value, duration);
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
return _controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
static StreamController<T> _buildController<T>(T value, Duration duration) {
final watch = Stopwatch();
Timer timer;
StreamController<T> controller;
Duration totalElapsed = Duration.zero;
void onResume() {
// Already cancelled or is not paused.
if (totalElapsed == null || timer != null) return;
totalElapsed = totalElapsed + watch.elapsed;
watch.start();
timer = Timer(duration - totalElapsed, () {
controller.add(value);
controller.close();
});
}
controller = StreamController(
sync: true,
onListen: () {
watch.start();
timer = Timer(duration, () {
controller.add(value);
controller.close();
});
},
onPause: () {
timer?.cancel();
timer = null;
watch.stop();
},
onResume: onResume,
onCancel: () {
timer?.cancel();
timer = null;
totalElapsed = null;
},
);
return controller;
}
}
abstract class ForwardingSink<T, R> {
/// Handle data event
void add(EventSink<R> sink, T data);
/// Handle error event
void addError(EventSink<R> sink, Object error, StackTrace st);
/// Handle close event
void close(EventSink<R> sink);
/// Fires when a listener subscribes on the underlying [Stream].
void onListen(EventSink<R> sink);
/// Fires when a subscriber pauses.
void onPause(EventSink<R> sink);
/// Fires when a subscriber resumes after a pause.
void onResume(EventSink<R> sink);
/// Fires when a subscriber cancels.
FutureOr onCancel(EventSink<R> sink);
}
/// @private
/// Helper method which forwards the events from an incoming [Stream]
/// to a new [StreamController].
/// It captures events such as onListen, onPause, onResume and onCancel,
/// which can be used in pair with a [ForwardingSink]
Stream<R> forwardStream<T, R>(
Stream<T> stream, ForwardingSink<T, R> connectedSink) {
ArgumentError.checkNotNull(stream, 'stream');
ArgumentError.checkNotNull(connectedSink, 'connectedSink');
StreamController<R> controller;
StreamSubscription<T> subscription;
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void runCatching(void Function() block) {
try {
block();
} catch (e, s) {
connectedSink.addError(controller, e, s);
}
}
final onListen = () {
runCatching(() => connectedSink.onListen(controller));
subscription = stream.listen(
(data) => runCatching(() => connectedSink.add(controller, data)),
onError: (Object e, StackTrace st) =>
runCatching(() => connectedSink.addError(controller, e, st)),
onDone: () => runCatching(() => connectedSink.close(controller)),
);
};
final onCancel = () {
final onCancelSelfFuture = subscription.cancel();
final onCancelConnectedFuture = connectedSink.onCancel(controller);
final futures = <Future>[
if (onCancelSelfFuture is Future) onCancelSelfFuture,
if (onCancelConnectedFuture is Future) onCancelConnectedFuture,
];
return Future.wait<dynamic>(futures);
};
final onPause = () {
subscription.pause();
runCatching(() => connectedSink.onPause(controller));
};
final onResume = () {
subscription.resume();
runCatching(() => connectedSink.onResume(controller));
};
// Create a new Controller, which will serve as a trampoline for
// forwarded events.
if (stream.isBroadcast) {
controller = StreamController<R>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: true,
);
} else {
controller = StreamController<R>(
onListen: onListen,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
sync: true,
);
}
return controller.stream;
}
class DebounceStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
/// Constructs a [StreamTransformer] which buffers events into a [List] and
/// emits this [List] whenever the current [window] fires.
///
/// The [window] is reset whenever the [Stream] that is being transformed
/// emits an event.
DebounceStreamTransformer(Stream Function(T event) window)
: super(
WindowStrategy.everyEvent,
window,
onWindowEnd: (Iterable<T> queue) => queue.last,
maxLengthQueue: 1,
);
}
extension Throttle<T> on Stream<T> {
Stream<T> throttle(Duration duration,{bool trailing = false, bool leading = true}) {
return transform(ThrottleStreamTransformer<T>(
(_) => TimerStream<bool>(true, duration),
trailing: trailing,
leading: leading,
),);
}
Stream<T> debounce(Duration duration) => transform(
DebounceStreamTransformer<T>((_) => TimerStream<void>(null, duration)));
}