参考资料
Pissay https://blog.piasy.com/AdvancedRxJava/2016/10/04/subjects-part-2/
一、重点对象
Flowable -----FlowableCreate
Subscribe----FlowableOnBackpressureBuffer
request-----AtomicLong 相当于一个计数,记录事件下发了多少次
BackpressureHelper---控制请求个数
drain:下发和整理事件
上游发送对象的emitter和下游onSubscribe持有的subscription是一个对象,所以反复操作它的request
BackPressure背压
在异步模型中,如果上游产生数据速度过快而下游消费事件过慢。会出现数据堆积导致内存不断增加而溢出的问题。为了解决这种问题RxJava提出了节流以及背压的策略。它是一种流速控制的策略。
1.响应式拉去实现流速控制
在普通的RxJava模型中,上游主动推送事件给下游,下游的被动接收数据(下游的onNext方法是被动触发的)。而在响应式拉取模型中,由下游来请求上游发送事件。
二、简单流程分析
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
//doSomething
}
}, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(20);
}
@Override
public void onNext(String o) {
//doSomething
}
@Override
public void onError(Throwable t) {
}
});
FlowableCreate对象
1.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) 负责产生一个Flowable对象,它和Obserable的功能近似也有subscribe方法完成对下游观察者事件的订阅。不同的是它提供了Backpressure策略的支持。所以在性能上低于Observable,因为内部为了完成背压操作添加了许多其他操作。
2.create实际产生一个FlowableCreate对象,这个对象会持有我们创建的FlowableOnSubscribe和背压策略
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
3.FlowableCreate负责根据背压策略mode来决定使用什么样的发射器emitter(Subscription),这一步是在订阅方法触发时完成的
- t.onSubscribe(emitter)会把发射器对象交给subscribe对象持有。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
.....
}
}
三、LatestAsyncEmitter分发器
我们采用的是LATEST策略:如果缓存池满了,会将后续的数据丢掉,但是不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
1.LatestAsyncEmitter分发器继承的是BaseEmitter,而BaseEmitter实际上是一个AtomicLong
2.request方法内部其实是对Emitter维护的计数进行修改
3.emitter内部维护了下游的Subscriber对象,用来调用下游的onNext方法传递事件
继承AtomicLong为了优化性能。AtomicLong实际维护的是request个数,这个操作可能是在异步线程操作的,如果使用volatile关键词来维护,而volatile为防止指令重排会加入内存栅栏,频繁操作会有性能损耗,所以由AtomicLong来维护。
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
final Subscriber<? super T> actual;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;
this.serial = new SequentialDisposable();
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
}
// BackpressureHelper.add(this, n) 内部实现
for (;;) {
//如果request维护的是一个Long.MAX_VALUE的值,不做任何操作
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
//将新值n和旧值相加
long u = addCap(r, n);
//更新requested维护的值
if (requested.compareAndSet(r, u)) {
return r;
}
}
//addCap
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
三、LatestAsyncEmitter的发送事件方法
只有在下游调用了request方法修改了BaseEmitter维护的引用计数,且触发了drain方法时才会下发事件吗,否则会因为在drain函数中判断了引用计数是否为0而终止发送
下游函数在onSubcrible函数中拿到BaseEmitter对象,调用它的request请求开始发送数据
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
//在LatestAsyncEmitter类中
@Override
void onRequested() {
drain();
}
done:是否忽略该事件
queue:负责存储下发的事件
drain:对事件是否下发做处理
上游在调用onNext时,事件被queue缓存起来,但是如果下游没有调用request函数进行修改计数,在drain函数中也会被拦截
//LatestAsyncEmitter的onNext方法
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.set(t);
drain();
get():拿到emitter维护的计数,就是下游请求发送多少个数据
e:代表当前处理了多少个数据的计数
wip:保证同一时刻只有一个线程操作drain函数,它是AtomicInteger类。getAndIncrement会返回AtomicInteger维护的数据后再进行加1、
wip.addAndGet(-missed):相当于wip.addAndGet(wip.get()-1)
这段代码会通过get()方法拿到下游请求了多少个数据,以及e用来计录已经处理了多少个数据。接着会判断订阅关系是否已经取消,如果取消就不发送。
否则会不断下发数据直到e != r 即将所有下游请求的数据都发送完毕
当所有数据都发送完毕后,会调用 BackpressureHelper.produced(this, e)修改维护的request计数,触发onComplete以及onError
调用 if (missed == 0)来判断是否有新的任务,如果missed为0代表所有任务都执行完毕,如果大于0代表有新的任务,需要再次执行for(;;)的代码
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final AtomicReference<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.lazySet(null);
return;
}
boolean d = done;
T o = q.getAndSet(null);
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.lazySet(null);
return;
}
boolean d = done;
boolean empty = q.get() == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
思路总结
使用Flowable和Subsrcibe对象完成订阅时,并不会像与Observable订阅完成后立即在subscribe方法中下发数据,而是在下游调用request时,通过触发drain函数(函数内部持有Subsrcibe,调用该对象的onNext方法)来去启动下发数据的流程,实现响应式拉取。拉取的数量由下游决定,通过更新BaseEmitter维护的任务计数来,来修改BaseEmitter处理事件的个数