创建型操作符
-
create
操作符
// 上游
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// 上游发射的
e.onNext("A"); // 使用者自己发射
}
})
// 订阅
.subscribe(
// 下游
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "下游接收 onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})
;
-
just
操作符
// 上游
Observable.just("A", "B") // 内部会去发射 A B
// 订阅
.subscribe(
// 下游
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
-
fromArray
操作符
String[] strings = {"张三", "李四", "王五"};
Observable.fromArray(strings)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
-
empty
操作符
// 上游没有发射有值得事件,下游无法确定类型,默认Object,RxJava泛型 泛型默认类型==Object
// 上游无法指定 事件类型
Observable.empty() // 内部一定会只调用 发射 onComplete 完毕事件
// 订阅
.subscribe(
// 下游 观察者
new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object integer) {
// 没有事件可以接受
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
// 隐藏 加载框...
}
}
-
range
操作符
Observable.range(80, 5) // 80开始 加1 数量共5个
// 订阅
.subscribe(
// 下游
new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//80 81 82 83 84
Log.d(TAG, "accept: " + integer);
}
});