适合的才是最好的-RxJava篇

原创博客地址
对于程序猿来说,Demo是最好的起手
而对于RxJava来说,你可以简单理解成:

  • 是一个观察者模式框架
  • 替代AsyncTask成为更好的异步操作工具
  • 即便逻辑再复杂,对于RxJava来说就是:简洁

首先上Demo

public static void main(String[] args) {
    // 0.准备一些数据
    Integer[] numbers = { 1, 2, 3, 4 };
    List<Integer> lists = Arrays.asList(numbers);
    
    // 1.创建一个被观察者
    //      被观察者很明显从List集合获取数据,现在就等着有人来订阅~
    Observable<Integer> observable = Observable.from(lists);

    // 2.创建一个观察者
    //      SubScriber是Observer的实现类,所以也是一个观察者
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer data) {
            // 被观察者发送的数据都会送到这里
            System.out.println("Rx -- onNext:" + data);
        }

        @Override
        public void onCompleted() {
            // 被观察者发送完数据会调用该方法
            System.out.println("Rx -- Complete!");
        }

        @Override
        public void onError(Throwable e) {
            // 被观察者传输数据中发生异常会调用该方法
            System.out.println("Rx -- Error!");
        }
    };

    // 3.订阅
    //      正常来说应该是:observer.subscribe(observable); 看起来更合乎逻辑
    //      这样反而像是:被观察者 订阅了 观察者(报纸 订阅了 读者)
    //      这涉及到流式编程,姑且先这样记住吧
    observable.subscribe(observer);
}

运行结果:

运行结果.png
  • 在观察者订阅的顺间,被观察者就发送数据过来了
  • 数据发送过来调用的方法:onNext()
  • 数据发送完成调用的方法:onCompleted()
  • 数据发送期间出现异常调用的方法:onError()

不要看代码多了,但逻辑很简洁!只有逻辑上的简洁才是真正的简洁!

上面的Demo看完一遍,大概知道有什么样的角色在扮演。

现在分析下每个角色:

观察者

作用:接收数据并进行处理

观察者毫无疑问就是Observer,但它是接口。在实际操作中,一般都使用它的抽象实现类Subscriber。两者使用方式完全一样。

public abstract class Subscriber<T> implements Observer<T>, Subscription

现在来看看观察者常用的创建方式

第一种:new Observer()接口

Observer<Integer> observer = new Observer<Integer>(){
    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer i) {
    }
};

第二种:new Subscriber()抽象类

Subscriber<Integer> subscriber = new Subscriber<Integer>(){
    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer i) {
    }
};

Subscriber中有一个方法:

/**
 * This method is invoked when the Subscriber and Observable have been connected but the Observable has
 * not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
 * useful initialization to your subscription, for instance to initiate backpressure.
 */
public void onStart() {
    // do nothing by default
}
  • 很明显是留给调用者自己重写
  • 英文好的可以自己看注释
  • 这里大致说下意思:这个方法是在观察者和被观察者已连接,但是被观察者还没有向观察者发送数据时进行调用
  • 所以,这个方法就是用来做初始化用的。

除此之外,Subscriber实现的Subscription接口还有两个方法:

public interface Subscription {
    void unsubscribe(); // 取消订阅
    boolean isUnsubscribed(); // 是否已经取消订阅
}
  • 取消订阅后,观察者将不会再接收事件
  • 取消之前先判断一下isUnsubscribed()
  • 如果程序中没有调用取消订阅方法,被观察者会始终持有观察者引用。造成内存泄漏

被观察者

作用:作为数据的发送方,它决定什么时候发送,怎么发送

被观察者ObservableJava里也有。很多地方都喜欢用这个单词作为被观察者,这也是它的直译。但是就因为都一样,所以小心不要导错包了。

现在来看看被观察者常用的创建方式

第一种:Observable.create()

Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        
    }
});
  • create()方法接收一个OnSubscribe接口参数
  • OnSubscribeObservable的内部接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}
  • 根据接口名,顾名思义。当观察者被订阅的时候,会调用这个call()方法
  • 下面举个小例子:
public static void main(String[] args) {
    // 观察者
    Observer<Integer> observer = new Observer<Integer>(){
        @Override
        public void onCompleted() {
            System.out.println("接收数据结束");
        }
        @Override
        public void onError(Throwable e) {
            
        }
        @Override
        public void onNext(Integer t) {
            System.out.println("接收数据:" + t);
        }
    };
    // 被观察者
    Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onCompleted();
        }
    });
    
    // 订阅
    observable.subscribe(observer);
}

运行结果:

运行结果2.png

注意:

这个方法已经被废弃了,推荐使用SyncOnSubscribeAsyncOnSubscribe

看名字应该知道是什么意思

create方法废弃.png

第二种:Observable.from()

Integer[] nums = {1, 2, 3};
Observable observable = Observable.from(nums);
  • 从一个数组Iterable中依次发送数据元素

第三种:Observable.just()

Observable observable = Observable.just(1, 2, 3);
  • 这个更直接。将参数依次发送过来。

订阅

observable.subscribe(observer);

其内部实现:

subscribe.png
  • subscriber.onStart()就是观察者中内置的用于初始化的方法
  • 被观察者.call(subscriber)就是
observable.call.png
  • 最后把观察者当成订阅者返回。前面说过
public abstract class Subscriber<T> implements Observer<T>, Subscription
  • 所以,你可以:
// 订阅
Subscription subscription = observable.subscribe(observer);
// 取消订阅
subscription.unsubscribe();
  • 形成链式编程

关于Action

前面在被观察者的第一种创建方式Observable.create()中,接收的参数是OnSubscribe接口。它继承了Action1

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}
  • 被观察者订阅时,OnSubscribecall()方法才会被调用
  • 这个call()就是Action1
public interface Action1<T> extends Action {
    void call(T t);
}

至于这个Action,你可以理解为就是一次单纯的行为,一个单纯的回调

有很多的Actionx

public interface Action0 extends Action {
    void call();
}

public interface Action1<T> extends Action {
    void call(T t);
}

public interface Action2<T1, T2> extends Action {
    void call(T1 t1, T2 t2);
}

public interface Action3<T1, T2, T3> extends Action {
    void call(T1 t1, T2 t2, T3 t3);
}
  • 0就代表call()方法没有参数
  • 1就代表call()方法有1个参数
  • 2就代表call()方法有2个参数
  • 至于ActionN接口
public interface ActionN extends Action {
    void call(Object... args);
}

Observable.subscribe(..)的时候,里面除了ObserverSubscriber这两个观察者之外。还可以接受一个Action

Action1<Integer> action1 = new Action1<Integer>() {
    @Override
    public void call(Integer num) {
        System.out.println("接收到数据:" + num);
    }
};
observable.subscribe(action1);

常用方法

map

People[] peoples = new People[]{
        new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
        new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        System.out.println("接收信息:" + name);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
};
// 被观察者
Observable.from(peoples).map(new Func1<People, String>() {
        @Override
        public String call(People people) {
            return people.getName();
        }
    }).subscribe(subscriber);
  • 可以看到被观察者从People数组里读取每一个元素
  • map方法里找到每一个元素对象的name传递给观察者
  • 观察者接收并使用
  • 这里转换范围很大,不仅仅只是提取属性。

运行结果

运行结果4.png

flatMap

People[] peoples = new People[]{
        new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
        new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String hobby) {
        System.out.println("接收信息:" + hobby);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
};
// 被观察者
Observable.from(peoples).flatMap(new Func1<People, Observable<String>>() {
        @Override
        public Observable<String> call(People people) {
            return Observable.from(people.getHobby());
        }
    }).subscribe(subscriber);
  • 效果和map是类似的
  • 区别在于map是用于一对一,而flatMap是用于一对多
  • 被观察者从People数组读取每一个对象,call()里读取每一个对象的hobby属性,并依次返回其中的一个元素

运行结果

运行结果3.png

filter

People[] peoples = new People[]{
        new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
        new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        System.out.println("接收信息:" + name);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
};
// 被观察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {

    @Override
    public Boolean call(People t) {
        return t.getAge() > 18;
    }
}).map(new Func1<People, String>() {
    @Override
    public String call(People people) {
        return people.getName();
    }
}).subscribe(subscriber);

运行结果

运行结果5.png

线程

RxJava遵循的线程原则在那个线程订阅,则被观察者和观察者的操作都在该线程

通过Schedulers切换线程

  • Schedulers.immediate()默认值。在当前线程运行。
  • AndroidSchedulers.mainThread():在Android主线程运行。
    • 注意:这个是RxAndroid里的。必须要导入RxAndroidjar包。RxJava里是没有的。
  • Schedulers.newThread()总是开启新线程运行。
  • Schedulers.io():如果操作涉及到I/O使用该项。
    • 也是总是开启新线程运行
    • 内部有线程池和复用
  • Schedulers.computation():如果操作涉及到图形计算等使用该项。

还是之前例子,但是增加两行代码:

People[] peoples = new People[]{
        new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
        new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
};
// 观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        System.out.println("接收信息:" + name);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
};
// 被观察者
Observable.from(peoples).filter(new Func1<People, Boolean>() {

    @Override
    public Boolean call(People t) {
        return t.getAge() > 18;
    }
}).map(new Func1<People, String>() {
    @Override
    public String call(People people) {
        return people.getName();
    }
}).subscribeOn(Schedulers.immediate()) // 当前线程
.observeOn(Schedulers.io()) // io线程
.subscribe(subscriber);
  • 被观察者在新开起的IO线程读取/过滤/转换操作
  • 数据传给观察者
  • 观察者当前线程显示数据

运行结果

运行结果5.png

总结

  • RxJava确实是一个非常强大的流式编程工具
  • 再复杂的逻辑,RxJava都能很简洁的表示
  • 一句代码完成线程切换,很方便
  • 用多了才知道它的美~
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容