RxJava学习:一个简单的Demo让你了解Rxjava的原理

客户需求

/**
 * 根据一个Cat-Sdk提供的API,通过关键字来搜索网络中符合要求的所有猫的图片,根据
 * 其可爱程度的数据,将这张最可爱的图片保存到本地中。
 * 完成任务:
 * 1、下载返回的猫的图片
 * 2、找到最可爱的那张图片
 * 3、保存到本地
 */

Cat-Sdk API (Version 1.0)

  • model类

      public class Cat implements Comparable<Cat> {
    
          private Bitmap mBitmap;
          private int mCuteness;
      
          @TargetApi(Build.VERSION_CODES.KITKAT)
          @Override
          public int compareTo(Cat another) {
              return Integer.compare(mCuteness, another.mCuteness);
          }
      }
    
  • 阻塞式interface

      public interface Api {
      
          // 通过查询条件查询所有符合条件的猫
          List<Cat> queryCats1(String query);
          // 保存最可爱的猫,并返回保存路径
          Uri store1(Cat cat);
      }
    

业务逻辑

最理想的做法:阻塞调用
    public class CatsHelper {
        private Api api;
        public Uri saveTheCutestCat1(String query) {
            List<Cat> catList = api.queryCats1(query);
            Cat cutestCat = findCutestCat(catList);
            Uri savedUri = api.store1(cutestCat);
            return savedUri;
        } 
        private Cat findCutestCat(List<Cat> catList) {
            return Collections.max(catList);
        }
     }

上面的代码是不是很清晰?是的。saveTheCutestCat1方法使用参数来调用这些方法。当一个方法执行完并返回结果后,再利用这个返回结果去调用下一个方法。除了简单有效以外,还有其他优点:

  • 组合功能(Composition)

    saveTheCutestCat1 由其他三个方法调用所组成,通过方法来把一个大功能分割为每个容易理解的小功能。通过方法调用来组合使用这些小功能,使用和理解起来都相当简单。

  • 异常处理

    每个方法都可以通过抛出异常来结束运行,该异常可以在抛出异常的方法里面处理,也可以在调用该方法的外面处理。无需每次都处理每个异常,我们还可以在一个地方处理所有可能抛出的异常,直接使用try...catch语句或者向外抛出。

理想是好的,但现实是残酷的。在Android中不可能只使用阻塞调用,而我们更多的是使用异步回调。

异步回调

这里我们先不去关心Cat-Sdk API使用异步调用的api去访问网络资源,暂时假设我们已经获取到了网络资源的结果了。那么新的API接口变为这样了:

public interface Api {
        
     interface CatsQueryCallback {
        void onCatListReceived(List<Cat> catList);

        void onError(Exception e);
    }
    interface StoreCallback {
        void onCatStored(Uri uri);

        void onStoredFail(Exception e);
    }
    List<Cat> queryCats2(String query, CatsQueryCallback callback);
    Uri store2(Cat cat, StoreCallback callback);
}

那我们的的CatsHelper也要发生改变了:

public class CatsHelper {
    private Api api;

    public interface CutestCatCallback {
        void onCutestCatSaved(Uri uri);

        void onQueryFailed(Exception e);
    }

    public void saveTheCutestCat2(String query, final CutestCatCallback cutestCatCallback) {

        api.queryCats2(query, new Api.CatsQueryCallback() {
            @Override
            public void onCatListReceived(List<Cat> catList) {
                Cat cutestCat = findCutestCat(catList);
                api.store2(cutestCat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        cutestCatCallback.onCutestCatSaved(uri);
                    }

                    @Override
                    public void onStoredFail(Exception e) {
                        cutestCatCallback.onQueryFailed(e);
                    }
                });
            }

            @Override
            public void onError(Exception e) {
                cutestCatCallback.onQueryFailed(e);
            }
        });
    }

这个这个...。与阻塞调用比起来,一个是天堂, 一个是地狱啊。业务逻辑虽然是一样的,但是有太多的干扰代码了;太多的的匿名内部类了;组合功能也不见了,需通过回调接口手动处理;异常也不会自动传递了,也需要手动的处理。上面的代码不仅长得恶心,并且更难发现潜在的bug。那怎么办了?怎么办了?

泛型接口

通过观察这三个接口(CatsQueryCallback, StoreCallback, CutestCatCallback),会发现一个共同点:

(1) 都有一个方法来返回结果(onCutestCatSaved, onCatListReceived, onCatStored)

(2) 都有一个方法来返回异常(onQueryFailed, onError, onStoredFail)

所以,我们可以使用一个泛型接口来替代这三个接口

public interface Callback<T> {
    void onResult(T result);

    void onError(Exception e);
}

由于我们无法修改SDK中api中方法的参数,所以需新创建一个包装类

public class ApiWrapper {
    private Api api;

    public void queryCats(String query, final Callback<List<Cat>> catsCallback) {
        api.queryCats2(query, new Api.CatsQueryCallback() {
            @Override
            public void onCatListReceived(List<Cat> catList) {
                catsCallback.onResult(catList);
            }

            @Override
            public void onError(Exception e) {
                catsCallback.onError(e);
            }
        });
    }

    public void store(Cat cat, final Callback<Uri> uriCallback) {
        api.store2(cat, new Api.StoreCallback() {
            @Override
            public void onCatStored(Uri uri) {
                uriCallback.onResult(uri);
            }

            @Override
            public void onStoredFail(Exception e) {
                uriCallback.onError(e);
            }
        });
    }       
}

再来看看我们的CatsHelper类

public class CatsHelper {
    private ApiWrapper wrapper;
    public void saveTheCutestCat3(String query, final Callback<Uri> cutestCallback) {
        wrapper.queryCats(query, new Callback<List<Cat>>() {
            @Override
            public void onResult(List<Cat> result) {
                Cat cutestCat = findCutestCat(result);
                wrapper.store(cutestCat, cutestCallback);
            }

            @Override
            public void onError(Exception e) {
                cutestCallback.onError(e);
            }
     });
}

嗯,不错。与上一版的CatsHelper比较来,看起来确实舒服多了。那么还可以再优化吗?

分离参数和回调接口

我们继续来观察它们的共同点:queryCats,store,saveTheCutestCat3这三个异步方法的参数都是一个回调接口和一般参数,那么把这个一般参数和回调接口分离,让异步操作只管理一般参数,而返回一个临时对象来管理回调接口。

首先创建一个管理回调接口的临时对象

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);
}

再修改我们的api包装类

public class ApiWrapper {
    private Api api;
    public AsyncJob<List<Cat>> queryCats2(final String query) {

        return new AsyncJob<List<Cat>>() {
            @Override
            public void start(final Callback<List<Cat>> callback) {
                api.queryCats2(query, new Api.CatsQueryCallback() {
                    @Override
                    public void onCatListReceived(List<Cat> catList) {
                        callback.onResult(catList);
                    }
                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
    public AsyncJob<Uri> store2(final Cat cat) {

        return new AsyncJob<Uri>() {
            @Override
            public void start(final Callback<Uri> callback) {
                api.store2(cat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        callback.onResult(uri);
                    }
                    @Override
                    public void onStoredFail(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

最后修改我们的CatsHelper类

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat4(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        
        final AsyncJob<Cat> cutestCatAsyncJob = new AsyncJob<Cat>() {
            @Override
            public void start(final Callback<Cat> callback) {
                catListAsyncJob.start(new Callback<List<Cat>>() {
                    @Override
                    public void onResult(List<Cat> result) {
                        callback.onResult(findCutestCat(result));
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
        AsyncJob<Uri> storedUriAsyncJob = new AsyncJob<Uri>() {
            @Override
            public void start(final Callback<Uri> callback) {
                cutestCatAsyncJob.start(new Callback<Cat>() {
                    @Override
                    public void onResult(Cat result) {
                        wrapper.store2(result)
                                .start(new Callback<Uri>() {
                                    @Override
                                    public void onResult(Uri result) {
                                        callback.onResult(result);
                                    }

                                    @Override
                                    public void onError(Exception e) {
                                        callback.onError(e);
                                    }
                                });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
        return storedUriAsyncJob;
    }
}

上面的代码将整个流程分解为几个操作,数据流向为:

         (async)                 (sync)           (async)
query ===========> List<Cat> -------------> Cat ==========> Uri
    queryCats              findCutest          store

虽然findCutest标记的是sync的,是相对qurey和store来说的,但在整个操作中,他还是属于异步的。因为如果一个操作是异步的,则每个调用该异步操作的方法也是异步的。

上面的代码量虽然增多了,但看起来还是比较清晰的,并且更加容易理解上面的异步操作:catListAsyncJob, cutestCatAsyncJob, storedUriAsyncJob. 不过,这个saveTheCutestCat4比saveTheCutestCat3有什么优势吗?这个这个...。好像是没有哦。好吧。我们继续优化。

简单的映射

在cutestCatAsyncJob那一块代码中,其实核心的逻辑只有findCutestCat(result),其他的代码只是为了启动AnsyJob并接收结果和处理异常的干扰代码。但是这些代码是通用的,我们可以把他们放到其他地方来让我们更加专注业务逻辑代码。How to do?

通过一个转换方法来转换AsyncJob 的结果。但由于Java的限制,无法把方法作为参数,所以需要用一个接口(或者类)并在里面定义一个转换函数:

public interface Func<T, R> {

    R call(T t);
}

当我们把 AsyncJob 的结果转换为其他类型的时候, 我们需要把一个结果值映射为另外一种类型,这个操作我们称之为 map。 把该函数定义到 AsyncJob 类中比较方便,这样就可以通过 this 来访问 AsyncJob 对象了。

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);

    public <R> AsyncJob<R> map(final Func<T, R> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        R mapped = func.call(result);
                        callback.onResult(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

再来看看CatsHelper类

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat5(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        final AsyncJob<Cat> cutestCatAsyncJob = catListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
         //方式一:
        AsyncJob<Uri> storedUriAsyncJob = cutestCatAsyncJob.map(new Func<Cat, Uri>() {
            @Override
            public Uri call(Cat cat) {
                AsyncJob<Uri> store2 = wrapper.store2(cat);
                //这里的返回值不正确,无法编译
                return store2;
            }
        });
        //方式二:
        AsyncJob<AsyncJob<Uri>> storedUriAsyncJob2 = cutestCatAsyncJob.map(new Func<Cat, AsyncJob<Uri>>() {
        @Override
            public AsyncJob<Uri> call(Cat cat) {
                AsyncJob<Uri> uriAsyncJob = wrapper.store2(cat);
                //这里的返回值不符合要求
                return uriAsyncJob;
            }
       });          
       return storedUriAsyncJob;
}               
}

根据方式二,我们只能够得到一个AsyncJob<AsyncJob>的双层异步结果,看来我们需要把它再压缩成一层AsyncJob才能满足我们的需求。在这里称之为flatMap。那么我们转换的结果不再是R,而是AsyncJob<R>

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);

    public <R> AsyncJob<R> map(final Func<T, R> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {
                        R mapped = func.call(result);
                        callback.onResult(mapped);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
    public <R> AsyncJob<R> flatMap(final Func<T, AsyncJob<R>> func) {
        final AsyncJob<T> source = this;
        return new AsyncJob<R>() {
            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {
                    @Override
                    public void onResult(T result) {

                        AsyncJob<R> mapped = func.call(result);
                        mapped.start(new Callback<R>() {
                            @Override
                            public void onResult(R result) {
                                callback.onResult(result);
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };
    }
}

最后回到我们的CatsHelper类

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat6(final String query) {
        final AsyncJob<List<Cat>> catListAsyncJob = wrapper.queryCats2(query);
        final AsyncJob<Cat> cutestCatAsyncJob = catListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
        AsyncJob<Uri> uriAsyncJob = cutestCat.flatMap(new Func<Cat, AsyncJob<Uri>>() {
       
            @Override
            public AsyncJob<Uri> call(Cat cat) {
                return wrapper.store2(cat);
            }
        });
        return uriAsyncJob;
    }               
}

上面的代码是不是似曾相识?是的。回过头再去看看前面我们说过的最理想的做法的代码。可能不是很明显,我们用Java8表达式来看看

public class CatsHelper {
    private ApiWrapper wrapper;
    public AsyncJob<Uri> saveTheCutestCat7(final String query) {
        AsyncJob<List<Cat>> listAsyncJob = wrapper.queryCats2(query);
        AsyncJob<Cat> cutestCat = listAsyncJob.map(cats -> findCutestCat(cats));
        AsyncJob<Uri> uriAsyncJob = cutestCat.flatMap(cat -> wrapper.store2(cat));
        return uriAsyncJob;
    }
    private Cat findCutestCat(List<Cat> catList) {
        return Collections.max(catList);
    }
}

使用RxJava

public class RxJavaApiWrapper {
private Api api;        
        
    public Observable<List<Cat>> rxJavaQueryCats(String query) {
        return Observable.create(new Observable.OnSubscribe<List<Cat>>() {
            @Override
            public void call(Subscriber<? super List<Cat>> subscriber) {
                api.queryCats2(query, new Api.CatsQueryCallback() {
                    @Override
                    public void onCatListReceived(List<Cat> catList) {
                        subscriber.onNext(catList);
                    }

                    @Override
                    public void onError(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }

    public Observable<Uri> rxJavaStore(Cat cat) {
        return Observable.create(new Observable.OnSubscribe<Uri>() {
            @Override
            public void call(Subscriber<? super Uri> subscriber) {
                api.store2(cat, new Api.StoreCallback() {
                    @Override
                    public void onCatStored(Uri uri) {
                        subscriber.onNext(uri);
                    }

                    @Override
                    public void onStoredFail(Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
        });
    }

}
----------------------------------------------------------------------------

public class RxJavaCatsHelper {
    private ApiWrapper wrapper;
    public Observable<Uri> rxJavaSaveTheCutestCat(String query) {
        Observable<List<Cat>> listObservable = wrapper.rxJavaQueryCats(query);
        Observable<Cat> cutestCat = listObservable.map(new Func1<List<Cat>, Cat>() {
            @Override
            public Cat call(List<Cat> catList) {
                return findCutestCat(catList);
            }
        });
        Observable<Uri> uriObservable = cutestCat.flatMap(new Func1<Cat, Observable<Uri>>() {
            @Override
            public Observable<Uri> call(Cat cat) {
                return wrapper.rxJavaStore(cat);
            }
        });

        return uriObservable;
    }

    private Cat findCutestCat(List<Cat> catList) {
        return Collections.max(catList);
    }
}

总结

  • AsyncJob 等同于 Observable, 不仅仅可以返回一个结果,还可以返回一系列的结果,当然也可能没有结果返回。

  • Callback 等同于 Observer, 除了onNext(T t), onError(Throwable t)以外,还有一个onCompleted()函数,该函数在结束继续返回结果的时候通知Observable 。

  • abstract void start(Callback callback) 和 Subscription subscribe(final Observer observer) 类似,返回一个Subscription ,如果你不再需要后面的结果了,可以取消该任务。

  • 除了 map 和 flatMap 以外, Observable 还有很多其他常见的转换操作。

看看我们的代码是不是和RxJava的代码很相似。当然我们的只是一个部分,RxJava还有很多好东西值得我们去挖掘的。

参考资料

http://yarikx.github.io/NotRxJava/

http://blog.chengyunfeng.com/?p=729

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容