四.创建数据流
- repeat:重复数据的数据流
实例操作符
repeat功能:可以重复上游Observable中的数据若干
import { of ,Observable} from 'rxjs';
import { repeat } from 'rxjs/operators';
const source$ = of('source')
const example = source$.pipe(repeat(2))
example.subscribe(
console.log,
null,
() => {
console.log('complete')}
)
// source
// source
// complete
const observable = new Observable(subscriber => {
console.log('on sunscribe')
setTimeout(() => subscriber.next(1), 100);
setTimeout(() => subscriber.next(2), 200);
setTimeout(() => subscriber.next(3), 300);
setTimeout(() => subscriber.complete(), 400);
return {
unsubscribe: () => {
console.log('unsubscribe')
}
}
});
const sour = observable.pipe(repeat(2))
sour.subscribe(console.log,
null,
() => {
console.log('complete')
})
/// on sunscribe
// 1
// 2
// 3
// unsubscribe
// on sunscribe
// 1
// 2
// 3
// complete
// unsubscribe
repeat订阅上游的Observable对象,把上游的数据传递给下游,但是只有在上游Observable对象完结之后才会重新订阅,因为在完结之前repeat不知道还会不会有新数据从上游推送下来,如果上游数据一直未完结,则repeat也就没有机会去调用unsubscribe。
const observable = new Observable(subscriber => {
console.log('on sunscribe')
setTimeout(() => subscriber.next(1), 100);
setTimeout(() => subscriber.next(2), 200);
setTimeout(() => subscriber.next(3), 300);
return {
unsubscribe: () => {
console.log('unsubscribe')
}
}
});
const sour$ = observable.pipe(repeat(2))
sour.subscribe(console.log,
null,
() => {
console.log('complete')
})
/// on sunscribe
// 1
// 2
// 3
- never / empty / throwError
- never:它不会向Observer发送任何操作,也永远不会完成。既不发送值,也不报错也不完结,就一直待着,知道永远
- throwError:不会给Observer发送任何值,只会在一瞬间传递一个error对象
- empty:创建一个完结的Observable对象,没有参数,不产生任何数据,直接完结
创建异步数据的Observable对象
- interval和timer
timer:在dueTime之后每隔period的时间间隔产生一个不断增加数字
interval:创建一个Observable,在指定的每个指定的时间间隔内产生一个不断增加的数字
如果
timer(dueTime: number | Date = 0,
periodOrScheduler?: number | SchedulerLike,
scheduler?: SchedulerLike): Observable<number>
dueTime:延迟事件
periodOrScheduler:间隔时间
当period没有填的时候,只会产生一个
interval(period: 0 = 0, scheduler: SchedulerLike = async): Observable<number>
import { interval } from 'rxjs';
const numbers = interval(1000);
numbers.subscribe(x => console.log(x))
// 间隔1秒 1, 2, 3, ......
import { timer } from 'rxjs';
const numbers = timer(3000, 1000);
numbers.subscribe(x => console.log(x));
// 3秒之后每间隔1秒 1,2,3,。。。。。
const numbers = timer(5000);
numbers.subscribe(x => console.log(x));
// 5秒之后 0
- from:可以把一切转换成Observable
可以把一个数组或者类数组或者一个Promise对象或者iterable对象或者类似于Observable的对象都可以转换成Observable
from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>
import { from } from 'rxjs';
const source$ = from([7,9,8]);
source$ .subscribe(x => console.log(x))
// 7 9 8
function *generateNumber(max){
for (let i = 0; i < max; i++) {
yield i
}
}
const iter$ = from(generateNumber(2))
iter$.subscribe(x => console.log(x))
// 0 1
异步
console.log('start')
const asy$ = from([1,2,3],asyncScheduler)
asy$.subscribe(x => console.log(x))
console.log('end')
// start end 1,2,3
- fromEvent
创建一个Observable对象,把指定事件类型传递给指定事件
fromEvent<T>(target: FromEventTarget<T>,
eventName: string,
options?: EventListenerOptions | ((...args: any[]) => T),
resultSelector?: ((...args: any[]) => T)): Observable<T>
target:The DOM EventTarget, Node.js EventEmitter,
JQuery-like event target, NodeList or HTMLCollection to attach the event
handler to.
eventName:事件名称例如 click,mousemove...
import { fromEvent } from 'rxjs'
import EventEmitter from 'events'
const clicks = fromEvent(document,'click')
clicks.subscribe( x => console.log(x) )
// node
const emitter = new EventEmitter();
const source$ = fromEvent(emitter, 'msg')
source$.subscribe(x => {
console.log(x),
error => console.log('error', error),
() => console.log('complete')
})
emitter.emit('msg',1)
emitter.emit('msg',2)
emitter.emit('msg',3)
// 1,2,3
fromEvent 其实产生的是 hot observable,也就是说数据的产生和订阅是无关的。
- fromEventPattern
fromEvent能够从事件源产生observable,但是对数据源的要求比较严格,要求是DOM或者Node的EventEmitter,所以就出现了fromEventPattern
fromEventPattern<T>(addHandler: (handler: NodeEventHandler) => any,
removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
resultSelector?: (...args: any[]) => T): Observable<T | T[]>
import { fromEventPattern } from 'rxjs';
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
const clicks = fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x));
// Whenever you click anywhere in the browser, DOM MouseEvent
// object will be logged.
fromEventPattern接受两个函数参数,分别对应产生Observable对象被订阅和退订时的动作。这是两个函数,所以可以定义的十分灵活.
- ajax
根据ajax请求返回结果产生Obserfvable对象
import { ajax } from 'rxjs/ajax'
import { map, catchError } from 'rxjs/Operators'
import { of } from 'rxjs'
const ons$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
map(res => console.log('res', res)),
catchError(error => {
console.log('error', error)
return of(error)
})
)
// 或则 ajax.getJSON()
// 或者ajax({
// url: 'https://httpbin.org/delay/2',
// method: 'POST',
// headers: {
// 'Content-Type': 'application/json',
// 'rxjs-custom-header': 'Rxjs'
// },。。。。。
- repeatWhen
repeat可以反复订阅上有的observable但是不能控制订阅时间所以repeatWhen可以实现在特定的时间内反复订阅某件事情
const notifer = ()=> {
return interval(1000)
}
const source$ = of(1,2,3)
const replace$ = source$.pipe(repeatWhen(notifer))
.subscribe( x => console.log(x) )
// 1 2 3 1 2 3.....
- defer
由于数据源头的observable需要占用资源,但是有时候创建一个observable的代价又比价大,所以希望尽可能延迟对应observable的创建,但是为了方便代码,又希望一个observable预先存在,这样方便订阅。所以为解决这个问题,就是依然会创建一个observable但是这个observable只是一个代理,在创建之时并不会分配资源,只有在订阅时才会去真正创建占用资源的obsevable
import { defer, fromEvent, interval } from 'rxjs';
const clicksOrInterval = defer(function () {
return Math.random() > 0.5
? fromEvent(document, 'click')
: interval(1000);
});
clicksOrInterval.subscribe(x => console.log(x));
小结
在创建数据流的时候应该明确区分是同步数据还是异步数据,对于同步数据流,关系的应该似乎产生什么样的数据以及已经产生数据的顺序由于数据之间是没有时间间隔的,所以不需要考虑异步,但是,对于异步数据流,除了要考虑产生什么样的数据,还应该考虑产生数据之间的间隔
rxjs的创建类操作符只是数据的搬运工,产生数据内容和节奏都是由外部数据流控制的