RxJava在Android移动端开发中的实战应用之一

响应式编程是以异步和数据流来构建事务关系的编程模型,异步和数据流是以构建事务关系而存在,异步是为了区分无关的事务,数据流是为了联系有关的事务

版权声明:本文来自门心叼龙的博客,属于原创内容,转载请注明出处
https://blog.csdn.net/geduo_83/article/details/89736624

1.时间段选择器,把一天24小时,按照每半个小时作为一个时间段进行分割,共生成48个时间段字符串集合进行返回

时间格式如下:
[00:00-00:30,
00:30-01:00,
01:00-01:30,
...
23:00-23:30,
23:30-00:00]

思路如下:

    1. 首先我们需要得到48个时间点如:
      2019-01-01 00:00
      2019-01-01 00:30
      2019-01-01 01:00
      2019-01-01 01:30
      ...
      ...
      2019-01-01 23:00
      2019-01-01 23:30
      2019-01-01 24:00
    1. 计算完毕,两两组合

实现步骤:

  • 1.创建了一个日期发射器
  • 2.创建一个0-49范围的有序整数序列的合集发射器
  • 3.通过combineLatest将两个两个发射器的事件进行两两结合
  • 4.计算得到48个时间点的事件
  • 5.通过buffer缓存48个时间点事件
  • 6.将得到的事件(00:00,00:30) 转换为另外一个事件(00:00-00:30)
  • 7.通过blockingFirst将计算结果返回

知识点:

  • 1.Observable.create创建一个发射器
  • 2.Observable.range创建一个发射指定范围的整数序列的发射器
  • 3.Observable.combineLatest组合两个发射器
  • 4.Observable.buffer缓存事件
  • 5.Observable.blockingFirst阻塞返回第一个事件

代码实现:

public static List<String> getTimeData() {
        //创建了一个日期发射器
        Observable<Calendar> calendarObservable = Observable.create(new ObservableOnSubscribe<Calendar>() {
            @Override
            public void subscribe(ObservableEmitter<Calendar> emitter) throws Exception {
                Calendar calendar = Calendar.getInstance();
                calendar.set(2019, 0, 1, 0, 0);
                emitter.onNext(calendar);
            }
        });
        //创建了一个发射0到48的一个数字发射器,目的是给日期做累加计算
        Observable<Integer> integerObservable = Observable.range(0, 49);
        //对日期发射器和数字发射器进行两两组合
        List<String> strings = Observable.combineLatest(calendarObservable, integerObservable, new BiFunction<Calendar, Integer, String>() {
            @Override
            public String apply(Calendar calendar, Integer integer) throws Exception {
                calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) + (integer == 0 ? 0 : 30));
                return DateUtil.formatDate(calendar.getTime(), DateUtil.FormatType.HHmm);
            }
        }).buffer(49).map(new Function<List<String>, List<String>>() {
            @Override
            public List<String> apply(List<String> list) throws Exception {
                List<String> timeLine = new ArrayList<>();
                for (int i = 0; i < list.size() - 1; i++) {
                    timeLine.add(list.get(i) + "-" + list.get(i + 1));
                }
                return timeLine;
            }
        }).blockingFirst();
        return strings;
    }

2.地图加载完毕且定位成功之后,显示当前定位点

在RxJava出现之前解决这类问题的确是一件很麻烦的事情,地图加载和定位成功两件事都完成之后才能进行显示当前点的操作,否则就有可能出现,当定位完成了,但是地图还没有加载完毕,而是导致定位点的缩放级别不能显示的问题,有了RxJava这些问题已经变得迎刃而解了。

实现步骤:

1.创建一个地图加载的发射器

ObservableEmitter<Boolean> mMapLoadEmitter;
Observable<Boolean> mapLoadObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
            @Override
            public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
                mMapLoadEmitter = emitter;
                KLog.v(TAG,"mapLoadObservable succ");
            }
});

2.创建一个地图定位的发射器

ObservableEmitter<PoiInfo> mLocationEmitter;  
Observable<PoiInfo> mapLocationObservable = Observable.create(new ObservableOnSubscribe<PoiInfo>() {
            @Override
            public void subscribe(ObservableEmitter<PoiInfo> emitter) throws Exception {
                KLog.v(TAG,"mapLocationObservable succ");
                mLocationEmitter = emitter;
            }
        });

3.开启地图加载监听,加载成功发射事件

当地图加载成功的时候,发射地图加载成功的事件

 mMap.setOnMapLoadedListener(new AMap.OnMapLoadedListener() {
            @Override
            public void onMapLoaded() {
                KLog.v(TAG,"onMapLoaded succ");
                KLog.v(TAG,"mMapLoadEmitter:"+(mMapLoadEmitter == null ? "null":"not null"));
                if(mMapLoadEmitter != null){
                    mMapLoadEmitter.onNext(true);
                }
            }
        });

4.开启地图定位监听,定位成功发射事件

当定位成功的时候,发射定位成功的事件

  mMap.setOnMyLocationChangeListener(new AMap.OnMyLocationChangeListener() {

            @Override
            public void onMyLocationChange(Location location) {
                KLog.json(TAG, location.toString());
                KLog.v(TAG,"onMyLocationChange succ");
                KLog.v(TAG,"mLocationEmitter:"+(mLocationEmitter == null ? "null":"not null"));
                mPoiInfo = new PoiInfo(location.getLatitude(), location.getLongitude());
                if(mLocationEmitter != null){
                    mLocationEmitter.onNext(mPoiInfo);
                }
            }
        });

5. 对地图加载发射器和地图定位发射器的事件进行打包合并处理

当地图加载成功,且地图定位成功,则显示当前位置点信息

 Observable.zip(mapLoadObservable, mapLocationObservable, new BiFunction<Boolean, PoiInfo, PoiInfo>() {
            @Override
            public PoiInfo apply(Boolean aBoolean, PoiInfo poiInfo) throws Exception {
                return poiInfo;
            }
        }).subscribe(new Consumer<PoiInfo>() {
            @Override
            public void accept(PoiInfo poiInfo) throws Exception {
                KLog.v(TAG,"zip succ");
                mMap.animateCamera(CameraUpdateFactory.newLatLngZoom(new LatLng(poiInfo.getLat(), poiInfo.getLon()),14));
            }
        });
    }

知识点:

  • 1.Observable.create创建发射器
  • 2.Observable.zip打包发射器

3. 网络请求完毕数据返回之后,根据头像的url对头像bitmap数据进行下载,最后最终将结果数据返回

实现步骤:

  • 1.将GetAppyDetailResponse事件转化为List<ApprovalProgress>事件
  • 2.将List<ApprovalProgress>事件转为一个能发射多个事件的发射器
  • 3.将每个ApprovalProgress事件都转换为另外一个能下载图片的发射器
  • 4.图片下载完毕,将事件进行发射
  • 5.将图片下载的事件进行缓存
  • 6.所有的图片都下载完毕之后最终将所有数据返回

知识点:

  • 1.Observable.map 将一个事件转换为另外一个事件
  • 2.Observable.flatMap 将一个事件转换为另外一个发射器
  • 3.Observable.toList 将所有的发射事件进行打包

代码实现:

 public Single getApplyDetail(GetAppyDetailRequest request) {
        cacheAppyDetailResponse = null;
        return mUseCarService
                .getApplyDeatail(request)
                .compose(RxAdapter.bindUntilEvent(getLifecycle()))
                .compose(RxAdapter.schedulersTransformer())
                .compose(RxAdapter.exceptionTransformer())
                // 把一个事件转化为另外一个事件
                .map(new Function<GetAppyDetailResponse, List<ApprovalProgress>>() {
                    @Override
                    public List<ApprovalProgress> apply(GetAppyDetailResponse getAppyDetailResponse) throws Exception {
                        cacheAppyDetailResponse = getAppyDetailResponse;
                        List<ApprovalProgress> handleprogress = getAppyDetailResponse.handleprogress;
                        // count = handleprogress.size();
                        // KLog.v(TAG,"count:"+count);
                        for(int i = 0; i < handleprogress.size(); i++){
                            handleprogress.get(i).orderid = i;
                        }
                        return handleprogress;
                    }
                })
                // 把一个事件转化为另外一个发射器
                .flatMap(new Function<List<ApprovalProgress>, Observable<ApprovalProgress>>() {
                    @Override
                    public Observable<ApprovalProgress> apply(List<ApprovalProgress> approvalProgresses)
                            throws Exception {
                        return Observable.fromIterable(approvalProgresses);
                    }
                    // 继续把一个事件转化为另外一个发射器
                })
                .flatMap(new Function<ApprovalProgress, ObservableSource<ApprovalProgress>>() {
                    @Override
                    public ObservableSource<ApprovalProgress> apply(final ApprovalProgress approvalProgress)
                            throws Exception {

                        return Observable.create(new ObservableOnSubscribe<ApprovalProgress>() {
                            @Override
                            public void subscribe(final ObservableEmitter<ApprovalProgress> emitter) throws Exception {
                                String headphoto = approvalProgress.headphoto;
                                int  resid = R.drawable.usercar_pic_user_bg;

                                KLog.v(TAG, "headurl:" + headphoto);

                                GlideApp.with(getContext()).asBitmap().load(headphoto)
                                        .into(new SimpleTarget<Bitmap>() {
                                            @Override
                                            public void onResourceReady(Bitmap resource,
                                                                        Transition<? super Bitmap> transition) {
                                                KLog.v(TAG, "header bitmap download succ");
                                                approvalProgress.header = BitmapUtil.scaleTo(resource,
                                                        DisplayUtil.dip2px(40), DisplayUtil.dip2px(40));
                                                emitter.onNext(approvalProgress);
                                                emitter.onComplete();
                                            }

                                            @Override
                                            public void onLoadFailed(@Nullable Drawable errorDrawable) {
                                                //super.onLoadFailed(errorDrawable);
                                                approvalProgress.header = BitmapFactory.decodeResource(getContext().getResources(),R.drawable.usercar_pic_user_bg);
                                                emitter.onNext(approvalProgress);
                                                emitter.onComplete();
                                            }
                                        });
                            }
                        });
                    }
                }).toList().map(new Function<List<ApprovalProgress>, GetAppyDetailResponse>() {
                    @Override
                    public GetAppyDetailResponse apply(List<ApprovalProgress> approvalProgresses) throws Exception {
                        KLog.v(TAG, "emitter succ");
                        Collections.sort(approvalProgresses);
                        cacheAppyDetailResponse.handleprogress = approvalProgresses;
                        return cacheAppyDetailResponse;
                    }
                }).compose(RxAdapter.singleSchedulersTransformer()).compose(RxAdapter.singleExceptionTransformer())
                .compose(RxAdapter.<GetAppyDetailResponse> bindUntilEvent(getLifecycle())); // 最后返回到主线程;
    }   

4.进入首页要进行权限检查,检查完毕再进行升级检查

实现步骤:

  • 1.创建一个权限检查的发射器
  • 2.对检查的结果事件转换为另外一个升级的发射器
  • 3.订阅该发射器处理事件结果

知识点:

  • 1.Observable.flatMap将一个事件转换为另外一个发射器

代码实现:

@Override
    public void checkPermisionAndUpgrade(FragmentActivity activity) {
        //先进行权限检查,在进行升级检查,避免同时出现权限授权对话框和升级对话框
        new RxPermissions(activity).request(Manifest.permission.ACCESS_FINE_LOCATION, Manifest.permission.READ_EXTERNAL_STORAGE, Manifest.permission.WRITE_EXTERNAL_STORAGE).flatMap(new Function<Boolean, ObservableSource<ClientVersionResponse>>() {
            @Override
            public ObservableSource<ClientVersionResponse> apply(Boolean aBoolean) throws Exception {
                if (!aBoolean) {
                    ToastUtil.showToast("缺少定位权限、存储权限,这会导致地图、导航、拍照等部分功能无法使用");
                }
                int versionCode = EnvironmentUtil.getAppVersionCode(mContext);
                return  mModel.getAppVersion(new ClientVersionRequest(String.valueOf(versionCode)));
            }
        }).subscribe(new Observer<ClientVersionResponse>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ClientVersionResponse clientVersionResponse) {
                KLog.v(TAG,"upgrade check succ...");
                if (UpgradeType.MUST.type.equalsIgnoreCase(clientVersionResponse.getVersion().upgrade) || UpgradeType.SUGGEST.type.equalsIgnoreCase(clientVersionResponse.getVersion().upgrade)) {
                    //存储版本升级信息
                    VersionManagerUtils.setLastVersion(clientVersionResponse.getVersion());
                    mView.showUpdateDialog(clientVersionResponse.getVersion());
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

5.多图片上传,先进行token请求,请求完毕再进行图片上传

图片上传可以说是一个在常见不过的功能了,在app的个人设置中上传一个个人头像,用微信发一个带几张图的朋友圈,都会用到图片上传的功能,一般情况下图片上传的时候,先要获取上传的权限,也就是先要获取Token才能进行进行下一步的图片上传操作

实现步骤:

  • 1.创建一个获取token的Observable
  • 2.将token事件转换为另外一个发送多个事件的Observable
  • 3.将每一个MediaRequest事件再转换为图片上传的Observable
  • 4.图片上传完毕发射该事件
  • 5.将图片下载成功的事件机型缓存
  • 6.全部下载完毕再返回数据

知识点:

  • 1.Observable.flatMap
  • 2.Obserable.create
/**
     * 批量上传文件,带进度条的
     *
     * @param context
     * @param type : 上传的业务类型 详见UploadImageUtil.FILE_TYPE_BLOG
     * @param callback
     * @param urls
     * @param listener
     */
    public static void uploadFile(final Context context, final int type, final String callback, final List<String> urls, final ResponseProgressListener listener) {
        Log.v(TAG, "uploadImages start...");
        final List<MediaResponse> list = new ArrayList<>();
        // 1.通过服务创建了一个请求Token的命令
        Observable<UserTokenResponse> observable = RetrofitManager.getInstance().getCommonService().getToken(new TokenRequest(type));
        // 2.将执行的结果和要上传的多条数据转化为一个新的Observable
        observable.flatMap(new Function<UserTokenResponse, ObservableSource<MediaRequest>>() {
            @Override
            public ObservableSource<MediaRequest> apply(UserTokenResponse userTokenResponse) throws Exception {
                Log.v(TAG, "get token succ...");
                Log.v(TAG, "thread id" + Thread.currentThread().getName());
                List<MediaRequest> mList = new ArrayList<>();
                STSV2 stsinfo = userTokenResponse.getStsinfo();
                for (int i = 0; i < urls.size(); i++) {
                    Log.v(TAG, urls.get(i));
                    mList.add(new MediaRequest(stsinfo, urls.get(i), i));
                }
                return Observable.fromIterable(mList);
            }
        }).flatMap(new Function<MediaRequest, ObservableSource<Object>>() {
            @Override
            public ObservableSource<Object> apply(final MediaRequest mediaRequest) throws Exception {
                // 3.对发射过来的每一条数据再次进行转换成一个可以执行上传任务的发射器,并将执行结果进行发送
                return Observable.create(new ObservableOnSubscribe<Object>() {
                    @Override
                    public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                        Log.v(TAG, "start upload...");
                        Log.v(TAG, "thread id" + Thread.currentThread().getName());
                        try {
                            final STSV2 stsv2 = mediaRequest.getSTSV2();
                            OSSCredentialProvider credentialProvider = new OSSStsTokenCredentialProvider(stsv2.keyid, stsv2.keysecret, stsv2.securitytoken);
                            OSSLog.enableLog();
                            final OSS oss = new OSSClient(context, stsv2.endpoint, credentialProvider);
                            String currurl = mediaRequest.getUrl();
                            final String fileName = MD5Util.MD5(UUID.randomUUID().toString()) + "." + currurl.substring(currurl.lastIndexOf(".") + 1);
                            String fileOssName = stsv2.filepath + fileName;
                            byte[] imageCompressByte;
                            if (FileUtil.isImageFile(currurl)) {
                                imageCompressByte = BitMapUtils.getImageCompressByte(currurl);
                            } else {
                                imageCompressByte = FileUtil.getFileByte(currurl);
                            }
                            final PutObjectRequest put = new PutObjectRequest(stsv2.bucket, fileOssName, imageCompressByte);
                            ObjectMetadata metadata = new ObjectMetadata();
                            metadata.setContentType("application/octet-stream");
                            put.setMetadata(metadata);

                            final StringBuilder backUrlSB = new StringBuilder();
                            backUrlSB.append("bucket=").append(stsv2.bucket);
                            backUrlSB.append("&filename=").append(fileOssName);
                            backUrlSB.append("&filetype=").append(type);
                            backUrlSB.append("&").append(callback);

                            put.setCallbackParam(new HashMap<String, String>() {
                                {
                                    put("callbackUrl", stsv2.callbackurl);
                                    put("callbackBody", backUrlSB.toString());
                                }
                            });
                            put.setProgressCallback(new OSSProgressCallback<PutObjectRequest>() {
                                @Override
                                public void onProgress(PutObjectRequest putObjectRequest, long l, long l1) {
                                    if (l > 0 && l1 > 0) {
                                        emitter.onNext(l + "|" + l1 + "|" + urls.size() + "|" + mediaRequest.getId());
                                    }
                                }
                            });

                            oss.asyncPutObject(put, new OSSCompletedCallback<PutObjectRequest, PutObjectResult>() {
                                @Override
                                public void onSuccess(PutObjectRequest putObjectRequest, PutObjectResult putObjectResult) {
                                    Log.v(TAG, "video upload succ...");
                                    Log.v(TAG, "thread id" + Thread.currentThread().getName());
                                    int[] imgSize = BitMapUtils.getImageSize(mediaRequest.getUrl());
                                    emitter.onNext(new MediaResponse(TextUtils.concat(stsv2.fileurl, fileName).toString(), imgSize[0], imgSize[1]));
                                }

                                @Override
                                public void onFailure(PutObjectRequest putObjectRequest, ClientException e, ServiceException e1) {
                                    Log.v(TAG, "video upload fail...");
                                    emitter.onError(e1);
                                }
                            });
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }

                    }
                });
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v(TAG, "onSubscribe start...");
                if (listener != null) {
                    listener.onStart();
                }
            }

            @Override
            public void onNext(Object object) {
                Log.v(TAG, "onNext start...");
                Log.v(TAG, "thread id" + Thread.currentThread().getName());
                if (listener != null) {
                    if (object instanceof String) {
                        String original = (String) object;
                        if (original.contains("|")) {
                            String[] split = original.split("\\|");
                            int progress = Integer.parseInt(split[0]);
                            int total = Integer.parseInt(split[1]);
                            int size = Integer.parseInt(split[2]);
                            int position = Integer.parseInt(split[3]);
                            //当前的图片所占自己的百分比
                            float v = (float) progress / total;
                            //当前图片所占所有图片的百分比
                            float v1 = (float) 1 / size * v;
                            //之前上传过得百分比
                            float v2 = (float) 1 / size * position;
                            //所占总的百分比
                            int v3 = (int) ((v1 + v2) * 100);
                            Log.v(TAG, "progress:" + progress + ";total:" + total + ";present:" + v3);
                            listener.onProgress(v3, 100);
                        }
                    } else if (object instanceof MediaResponse) {
                        list.add((MediaResponse) object);
                        if (list.size() == urls.size()) {
                            listener.onSuccess(list);
                        }
                        Log.v(TAG, "observer receive succ");
                    }
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "onError start...");
                if (listener != null) {
                    listener.onError(e);
                }
            }

            @Override
            public void onComplete() {
                Log.v(TAG, "onComplete start...");
                if (listener != null) {
                    listener.onComplete();
                }
            }
        });

    }

6.对于请求的事件数据进行转换

实现步骤:

    1. 将GetUseStatusListResponse事件转化为List<VehicleUseStatus>事件
  • 2.List<VehicleUseStatus>事件转换为另外一个Observable
  • 3.对VehicleUseStatus事件进行操作
  • 4.缓存 VehicleUseStatus事件
  • 5.订阅Observable并返回处理的最终结果

知识点:

    1. Observable.map
    1. Observable.flatMap
    1. Observable.toList
  mModel.getCarStateList(new GetUseStatusListRequest(currDate, enterpriseid, stardid)).map(new Function<GetUseStatusListResponse, List<VehicleUseStatus>>() {
            @Override
            public List<VehicleUseStatus> apply(GetUseStatusListResponse getUseStatusListResponse) throws Exception {
                stardid = getUseStatusListResponse.nextid;
                return getUseStatusListResponse.statuslist;
            }
        }).flatMap(new Function<List<VehicleUseStatus>, ObservableSource<VehicleUseStatus>>() {
            @Override
            public ObservableSource<VehicleUseStatus> apply(List<VehicleUseStatus> vehicleUseStatuses) throws Exception {
                return Observable.fromIterable(vehicleUseStatuses);
            }
        }).map(new Function<VehicleUseStatus, VehicleUseStatus>() {
            @Override
            public VehicleUseStatus apply(VehicleUseStatus o) throws Exception {
                String usetime = o.usetime;
                if (!TextUtils.isEmpty(usetime)) {
                    String[] split = usetime.split("\\|");
                    if (split != null && split.length > 0) {
                        o.useposition = getTimePositon(Arrays.asList(split));
                    }
                }
                return o;
            }
        }).toList().subscribe(new SingleObserver<List<VehicleUseStatus>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(List<VehicleUseStatus> vehicleUseStatuses) {
                isfirst = false;
                if (isrefresh) {
                    String nowDate = DateUtil.formatDate(new Date(), DateUtil.FormatType.MMdd);
                    String tempDate = DateUtil.formatDate(currDate, DateUtil.FormatType.yyyyMMdd,DateUtil.FormatType.MMdd);
                    mView.showTopBarCurrDate(nowDate.equals(tempDate) ? tempDate + "今天" : tempDate);
                    mView.enableTopBarLeftBtn(nowDate.equals(tempDate) ? false : true);
                    if(vehicleUseStatuses != null && vehicleUseStatuses.size() > 0){
                        mView.refreshData(vehicleUseStatuses,currDate);
                    }else{
                        mView.showNoDataView();
                    }
                    mView.stopRefresh();
                } else {
                    mView.loadMoreData(vehicleUseStatuses);
                    mView.stopLoadMore();
                }

            }

            @Override
            public void onError(Throwable e) {
                if (isrefresh) {
                    mView.stopRefresh();
                    if(isfirst){
                        if(e instanceof ResponseThrowable){
                            ResponseThrowable throwable =  (ResponseThrowable) e;
                            if(throwable.code == ExceptionHandler.SYSTEM_ERROR.TIMEOUT_ERROR){
                                mView.showNetWorkErrView();
                            }
                        }else{
                            mView.showNoDataView();
                        }
                    }
                } else {
                    mView.stopLoadMore();
                }

            }
        }); 

7.车辆设备id过滤

实现步骤:

  • 1.对车辆集合中的车辆进行过滤,过滤有设备的车辆
  • 2.对过滤后的车辆事件进行转换为设备事件
  • 3.将设备集合事件转换为一个Observable
  • 4.对设备事件进行过滤,过滤为0的设备
  • 5.对设备事件转换为设备的id事件
  • 6.缓存设备id事件
  • 7.订阅这个事件并返回结果

知识点:

  • 1.Observable.fromIterable
  • 2.Observable.filter
  • 3.Observable.map
  • 4.Obserable.flatMap
  • 5.Obserable.buffer
 @Override
    public void filterVehicleDevice(List<VehicleInfo> vehicleInfoList, final VehicleInfo vehicleInfo) {
        Observable.fromIterable(vehicleInfoList).filter(new Predicate<VehicleInfo>() {
            @Override
            public boolean test(VehicleInfo vehicleInfo) throws Exception {
                // 将有设备的车辆过滤下来
                return vehicleInfo.getDevices() != null
                        && vehicleInfo.getDevices().size() > 0;
            }
        }).map(new Function<VehicleInfo, List<DeviceInfo>>() {
            @Override
            public List<DeviceInfo> apply(VehicleInfo vehicleInfo) throws Exception {
                // 对事件进行转换,将VehicleInfo转化为DeviceInfo
                return vehicleInfo.getDevices();
            }
        }).flatMap(new Function<List<DeviceInfo>, ObservableSource<DeviceInfo>>() {
            @Override
            public ObservableSource<DeviceInfo> apply(List<DeviceInfo> deviceInfos) throws Exception {
                // 再次将device事件转化为Observable
                return Observable.fromIterable(deviceInfos);
            }
        }).filter(new Predicate<DeviceInfo>() {
            @Override
            public boolean test(DeviceInfo deviceInfo) throws Exception {
                // 对传递过来的DeviceInfo进行过滤
                return "0".equals(deviceInfo.devicetype);
            }
        }).map(new Function<DeviceInfo, String>() {

            @Override
            public String apply(DeviceInfo deviceInfo) throws Exception {
                // 再次对事件进行转换
                return deviceInfo.deviceid;
            }
        }).buffer(Integer.MAX_VALUE).subscribe(new Observer<List<String>>() {
            ArrayList<String> listBox;

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<String> strings) {
                // 对过滤的数据把它缓存起来,然后进行打包输出
                if (mView == null) {
                    return;
                }
                listBox = new ArrayList<>(strings);
            }

            @Override
            public void onError(Throwable e) {
                mView.filterVehicleDeviceComplete(listBox, vehicleInfo);
            }

            @Override
            public void onComplete() {limView.filterVehicleDeviceComplete(listBox, vehicleInfo);
            }
        });
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,592评论 0 10
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,863评论 6 13
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 626评论 0 1
  • 记录RxJava操作符,方便查询(2.2.2版本) 英文文档地址:http://reactivex.io/docu...
    凌云飞鱼阅读 811评论 0 0
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 906评论 0 3