Android 用RxJava模拟一个EventBus ———RxBus

** 本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布*

RxBus的核心功能是基于Rxjava的,既然是模拟EventBus,我们需要搞清楚RxJava满足实现EventBus的那些条件,这样才能更好的实现RxBus。

EventBus是Android上的一个事件发布/订阅的事件总线框架,可以充分的解耦,简化了四大组件、UI线程与子线程的间的事件传递等等。它基本工作流程如下:

  • 1、订阅:EventBus.getDefault().register(this);
  • 2、发送事件:EventBus.getDefault().post(event);
  • 3、接受、处理事件:onEventXXX(Object event);
  • 2、取消订阅:EventBus.getDefault().unregister(this);

根据EventBus的工作流程,我们的RxBus首先需要自身的实例,这一点我们可以仿照EventBus的getDefault()方法,通过一个单例来实现。有了RxBus实例就可以进行订阅了,在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,这里我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者,此时Subject对象充当被订阅者的角色。完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的Subject对象,则此时Subject对象做为订阅者接收事件,然后会立刻将事件转发给订阅该Subject对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。最后就是取消订阅的操作了,Rxjava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。

到这里我们已经结合EventBus对RxBus的可行性以及大概的实现流程进行了分析,接下来结合实现代码再做进一步的解释:

public class RxBus {
    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;
    private HashMap<String, CompositeSubscription> mSubscriptionMap;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }

    /**
     * 发送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }

    /**
     * 返回指定类型的Observable实例
     *
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> toObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

    /**
     * 是否已有观察者订阅
     *
     * @return
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 一个默认的订阅方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return tObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

    /**
     * 保存订阅后的subscription
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }

    /**
     * 取消订阅
     * @param o
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }

        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)){
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).unsubscribe();
        }

        mSubscriptionMap.remove(key);
    }
}

先看一下这个私有的构造函数:

private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

由于Subject类是非线程安全的,所以我们通过它的子类SerializedSubject将PublishSubject转换成一个线程安全的Subject对象。之后可通过单例方法getInstance()进行RxBus的初始化。

toObservable()根据事件类型,通过mSubject.ofType(type);得到一个Observable对象,让其它订阅者来订阅。其实ofType()方法,会过滤掉不符合条件的事件类型,然后将满足条件的事件类型通过cast()方法,转换成对应类型的Observable对象,这点可通过源码查看。
同时封装了一个简单的订阅方法doSubscribe(),只需要传入事件类型,相应的回调即可。其实可以根据需求在RxBus中扩展满足自己需求的doSubscribe()方法,来简化使用时的代码逻辑。

在需要发送事件的地方调用post()方法,它间接的通过mSubject.onNext(o);将事件发送给订阅者。

同时RxBus提供了addSubscription()unSubscribe()方法,分别来保存订阅时返回的Subscription对象,以及取消订阅。

接下我们在具体的场景中测试一下:
1、我们在Activity的onCreate()方法中进行进行订阅操作:

private void doSubscribe() {
        Subscription subscription1 = RxBus.getInstance()
                .tObservable(String.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        mTv.setText("事件内容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription1);
    }

可以看到我们设定事件类型为String,并且Subscriber的回调发生在主线程,同时保存了Subscription对象。
然后通过一个Button发送事件:

mBtn1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                RxBus.getInstance().post("1024");
            }
        });

我们直接在UI线程发送了String类型的1024,看效果:

发送UI线程事件

2、同样在onCreate()方法中进行进行订阅操作:

private void doSubscribe() {
        Subscription subscription2 = RxBus.getInstance()
                .doSubscribe(Integer.class, new Action1<Integer>() {
                    @Override
                    public void call(Integer s) {
                        mTv.setText("事件内容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription2);
    }

我们使用了RxBus中封装好的doSubscribe()方法,设置事件类型为Integer。
这次我们通过Button在子线程中发送一个事件:

mBtn2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RxBus.getInstance().post(2048);
                    }
                }).start();
            }
        });

在子线程发送了一个Integer类型的2048,看效果:


发送子线程事件

3、我们再测试下在广播中发送事件,订阅方式按照场景1的方式。
然后定义一个检测网络状态的广播:

public class NetworkChangeReceiver extends BroadcastReceiver {
    @Override
    public void onReceive(Context context, Intent intent) {
        ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo networkInfo = manager.getActiveNetworkInfo();
        if (networkInfo != null && networkInfo.isAvailable()) {
            RxBus.getInstance().post("网络连接成功");
        } else {
            RxBus.getInstance().post("网络不可用");
        }
    }
}

在网络可用与不可用时发送提示事件,然后在onCreate()方法中注册广播:

private void registerReceiver() {
        IntentFilter intentFilter = new IntentFilter();
        intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
        mReceiver = new NetworkChangeReceiver();
        registerReceiver(mReceiver, intentFilter);
    }

我们手动打开、关闭网络,可以看到mTv上会显示网络状态的提示信息,看效果:


在广播中发送事件

最后不要忘了在onDestory()中对广播进行取消注册,以及取消订阅。

protected void onDestroy() {
        super.onDestroy();
        unregisterReceiver(mReceiver);
        RxBus.getInstance().unSubscribe(this);
    }

其它场景有兴趣的可自行测试哦!到这里RxBus的基本功能就实现了。

但是还不够完善,一般情况我们都是先订阅事件,然后发送事件,如果我们反过来,先发送了事件,再进行订阅操作,怎么保证发送的事件不丢失呢?也就是EventBus中的StickyEven功能。
其实通过RxJava实现类似的功能很简单,Subject有一个子类BehaviorSubject,在被订阅之前,它可以缓存最近一个发送给它的事件,当被订阅后,它会立刻将缓存事件发送给订阅者,这样就解决了我们之前的疑问。RxBus需要做的修改很简单:

private RxBus() {
        mSubject = new SerializedSubject<>(BehaviorSubject.create());
    }

但是有一点需要注意BehaviorSubject只能缓存最近的一个事件,如果有多个事件怎么办?对RxJava来说都不是事,Subject还有一个子类ReplaySubject,在被订阅之前,它可以缓存多个发送给它的事件,在被订阅后会发送所有事件给订阅者,相信如何修改RxBus已经很明显了。

有兴趣的话可以下载源码测试:点我下载哦!

最后推荐一些RxJava的学习资源:RxJava入门给 Android 开发者的 RxJava 详解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容