也谈rxjava2

众所周知Android开发不能在主线程中进行耗时操作,所以一些操作必须放在子线程中进行,这样一来就就会涉及到涉及线程的创建及线程间的通信。当然Android系统也提供了AsyncTask,但是在处理嵌套处理方面做的并不优雅。rxjava采用事件流的方式来解决了这一问题,当然rxjava的作用及优点不止是这个,还有很多的功能在使用起来也是让人爱不释手。本文不是对rxjava的用法及功能进行介绍,而是对rxjava的内部原理进行分析。

rxjava主要是通过发布/订阅模式来实现事件的控制和处理。两个接口和简单的一行代码就能明白rxjava的原理:

//发布者
public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}
//订阅者
public interface Observer<T> {

void onSubscribe(@NonNull Disposable d);

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();

}
//订阅
observableSource.subsrcibe(observer)
简单用法
Observable.create(ObservableOnSubscribe<String> {
            //代码1
            it.onNext("hello world")
        }).flatMap(Function<String, ObservableSource<String>> {
            val value = it
            ObservableSource {
            //代码2
                it.onNext("flatmap-->$value")
            }
        }).subscribe(object : Observer<String> {
            override fun onComplete() {
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onNext(t: String) {
            //代码3
                e("MainActivity", "t--->$t")
            }
            override fun onError(e: Throwable) {
            }

        })

上面这段段代码创建了三个主要的对象:ObservableOnSubscribe(A)、ObservableFlatMap(B)和Observer(C),然后通过subscribe()方法将这个链串了起来。首先来看下Observable.create()方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

这里创建了一个ObservableCreate对象,ObservableCreate继承自Obsevable,Observable实现了ObservableSource接口。然后调用了flatMap方法,最终会创建ObservableFlatMap对象,这相当于B订阅了A。当调用了subsrcibe方法时,相当于C订阅了B。我们来看下ObservableFlatMap中的subsrcibe()方法,然后又调用了subscribeActual()这个核心方法:

 @Override
public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

这里的source对象就是他订阅的A对象,调用的subscribe(Observer o)方法就是代码方法,这里的o对象就是上面代码中的MergeObserver对象本身,紧接着又调用了MergeObserver.onNext()方法:

 @Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            s.dispose();
            onError(e);
            return;
        }
       //省略部分代码
    }

这里的mapper对象就是代码2处创建的Function对象,然后返回一个ObservaleSource对象,之后继续调用了下一个订阅者(也就是对象A)的onNext()方法。至此这个事件流就通过订阅链依次到每一个订阅者。

我们通过简单的代码来快速了解下rxjava的原理:

//发布者
public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);}
//订阅者
public interface Observer<T> {
    void onNext(T t);
}
//中间订阅者
public interface Function<V, K> {
  K apply(V v);
}
public abstract class FlatMapObservable<T> implements Observer<T>, ObservableSource<T> {
    private T mT;
    public <K> ObservableSource<K> flatMap(Function<T, ? extends ObservableSource<K>> function) {
    return function.apply(mT);
}
    @Override
    public void onNext(T t) {
    mT = t;
    }
}

//dome
public class Test {
    public static void main(String[] args) {

    new FlatMapObservable<String>() {
        @Override
        public void subscribe(Observer<? super String> observer) {
            observer.onNext("hello  world");
        }
    }.flatMap(new Function<String, ObservableSource<Boolean>>() {
        @Override
        public ObservableSource<Boolean> apply(String s) {
            return new ObservableSource<Boolean>() {
                @Override
                public void subscribe(Observer<? super Boolean> observer) {
                    observer.onNext(false);
                }
            };
        }
    }).subscribe(new Observer<Boolean>() {
        @Override
        public void onNext(Boolean aBoolean) {
            System.out.println(aBoolean);

        }
    });
 }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容