1. RxJava 内存泄漏原因
- Disposable 基本原理:
(1)这里仅看下 Observer 的执行,如在主线程执行 Observer,会走 Observable 的 observeOn 方法,然后会把 Observable 包装成 ObservableObserveOn。
当被订阅者如 PublishSubject 通过 onNext 发送事件时,会调用 ObservableObserveOn 中的订阅者 ObserveOnObserver 的 onNext 方法。之后会通过 Worker 来执行 ObserveOnObserver(实现了Runnable接口) 的 run() 方法。
(2)Worker 是每个动作的执行者,通过线程池执行,而 Worker 的创建和管理是通过 Schedulers 来完成的,在 subscribe 订阅时 Schedulers 负责创建该订阅者执行的 Worker。Schedulers 是我们在使用 RxJava 时指定的,如 Schedulers.io(),Schedulers.computation() 或 AndroidSchedulers.mainThread()。
这里以 Schedulers.computation() 创建的 Worker 为例:
ObserveOnObserver.oNext() --> EventLoopWorker.schedule() --> PoolWorker.scheduleActual() --> ScheduledExecutorService.submit()
(3)scheduleActual() 函数中通过线程池执行任务,传入的 Runnable 使用 ScheduledRunnable 包装了一层,ScheduledRunnable 实现了 Callable,通过线程池执行时返回 Future,通过 Future 可以获取任务执行状态以及可以取消任务。
scheduleActual() 返回结果是 ScheduledRunnable,ScheduledRunnable 同时实现了 Disposable 接口,在 dispose() 方法中通过 Future.cancel() 来取消任务执行。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// ScheduledRunnable 实现了 Disposable接口,函数返回 Disposable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 通过线程池执行
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
- 上下游的 Disposable 传递
上游 Single.just(true)
| .delay(2500, TimeUnit.MILLISECONDS)
| .subscribeOn(Schedulers.io())
| .observeOn(AndroidSchedulers.mainThread())
下游 .subscribe(new Consumer<Boolean>()
订阅时的基本流程:简单来说就是下游调用 subscribe(),向上游调用subscribe(),上游 subscribe() 中创建 Disposable,再往下游调用 Observer 的 onSubscribe(Disposable),下游会对上游传过来的 Disposable 进行包装,所以最终调用 dispose() 方法时,下游的 dispose() 方法中也会调用上游的 dispose() 方法。
例如:
(1)LambdaObserver 中 DisposableHelper.setOnce(this, d)
(2)ObserveOnObserver 中变量 Disposable upstream;
调用 dispose(),会调用自己的 dispose() 方法和上游的 dispose() 方法
2. 解决方法
内存泄漏示例一:
@Override
protected void onStart() {
super.onStart();
mDisposable = Single.just(true)
.delay(2500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
// do something
}
}, Functions.emptyConsumer());
}
由于在 subscribe 方法中创建 Consumer 或者 Observer 时,属于匿名内部类,所以会持有外部类对象,若果外部类是 Activity 或者 Fragment 或者 View,当页面销毁或者 View 销毁时,RxJava 的线程还在执行,就会一直持有 Activity 、Fragment 或 View,导致内存泄漏。对于这种情况的解决,就是取消订阅,以及结束 RxJava 的线程执行,保证 RxJava 中的订阅者能够被回收。常见的处理方式有以下三种:
- 解决方式一:
@Override
protected void onDestroy() {
super.onDestroy();
// 手动解除订阅
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
mDisposable = null;
}
}
- 解决方式二:使用 CompositeDisposable
CompositeDisposable 是一个 disposable 的容器,可以容纳多个 disposable,添加和去除的复杂度为O(1)。
// 成员变量
private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
protected void onStart() {
super.onStart();
mCompositeDisposable.add(dispose1);
mCompositeDisposable.add(dispose2);
mCompositeDisposable.add(dispose3);
...
}
@Override
protected void onDestroy() {
super.onDestroy();
// 手动解除订阅
if (mCompositeDisposable != null && !mCompositeDisposable.isDisposed()) {
mCompositeDisposable.dispose();
}
}
- 解决方式二:
@Override
protected void onStart() {
super.onStart();
mDisposable = Single.just(true)
.delay(2500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(((BaseActivity)getActivity()).bindUntilEvent(ActivityEvent.DESTROY)) // 根据生命周期自动解除订阅
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
// do something
}
}, Functions.emptyConsumer());
}