EventBus源码分析(三)
在前面的两篇文章中分别对EventBus中的构建,分发事件以及注册方法的查询做了分析,本篇文章是最后一部分,主要针对EventBus对于注册方法的调用作学习了解,前面两篇文章的链接如下:
EventBus源码分析(一)
EventBus源码分析(二)
EventBus方法调用机制需要从EventBus的postToSubscription方法开始,该方法在第一篇文章中也曾分析过,其代码如下:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这个方法的代码也可以清晰地说明了在不同的threadMode情况下,注册方法在不同线程调用的情况。
在理解注册方法在哪个线程被调用之前需要首先明确postToSubscription()方法在哪个线程被调用,该方法是由EventBus对象的post()方法经过一系列方法调用的,所以它运行在用户发送事件的线程中,参数isMainThread表示该运行线程是否为主线程,下面四种threadMode中,注册方法被调用的线程分别为:
- POSTING:最简单的一种模式,即直接在事件发送的线程中调用注册方法;
- MAIN:在主线程中调用注册方法,如果事件发送线程不是主线程,则由mainThreadPoster将方法调用派送到主线程中调用,其实就是使用的Handler机制,后面再做分析;
- BACKGROUND:在后台线程中调用,如果事件发送不是在主线程,则注册方法则直接在该线程中被调用,如果在主线程发送事件,则注册方法由backgroundPoster派发到工作线程中调用;
- ASYNC:直接由asyncPoster派发到工作线程中调用,至于它与BACKGROUND的区别,后面会根据源码做分析。
所以从该方法中可以看出,在切换不同线程调用注册方法,是通过三个Poster完成的,下面分别对它们的源码做分析。
1. HandlerPoster
首先mainThreadPoster的类型为HandlerPoster,从名字可以看出它应该与Handler有关。在android的编程中,如果一些耗时操作的结果需要更新UI,那么需要通过定义Handler的形式,将结果加入到主线程的消息队列,然后由自定义在主线程的Handler对消息做处理。那么这里的原理也是一致的。首先看HandlerPoster的代码:
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
...
}
可以看到HandlerPoster是继承自Handler,那么它则是根据构造器中传入的Looper对象,在handleMessage()方法中处理对应线程中消息队列的消息,那么我们再看在EventBus对象初始化时,mainThreadPoster初始化是否为主线程的Looper, EventBus的构造器中的部分代码为:
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
Looper.getMainLooper()说明了mainThreadPoster的handleMessage方法是在主线程中被调用,就可以实现将注册方法派发到主线程中调用了。其他参数后面部分一一分析。
首先来看PendingPostQueue这个明显是一个队列,其代码有兴趣的可以自行查看,它是自定义的队列,没有实现Java定义的Queue接口,这里只是简单定义了内部的队列数据结构以及入队和出队方法,只不过有一个与时间相关的出队方法,后面用到时会提到。PendingPostQueue内部的队列结构中的元素即为PendingPost,它封装了一次注册方法调用所需要的注册信息和事件类型,即Subscription类型对象和事件对象。类的代码很简单,不再贴出,但是PendingPost中有一个对象的缓存池,最多可以保存10000个PendingPost对象,避免了对象的创建和回收。有兴趣的可以查看源码。
下面看HandlerPoster的入队方法:
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
首先根据注册信息和事件对象构建一个PendingPost加入队列,然后通过发送消息通知Handler自身调用handleMessage()方法,这里的handlerActive是一个标志位,标志当前是否正在执行处理PendingPostQueue队列中的PendingPost,也就是正在调用队列中的注册方法。下面为handleMessage()方法:
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
这里代码的逻辑就是循环遍历队列,通过eventBus.invokeSubscriber(pendingPost)方法执行注册方法,invokeSubscriber()方法在第一篇文章中已经分析过。两次判断pendingPost是否为空是处理多线程的同步问题,这里与单例模式是同样的道理。不过这里需要注意的是EventBus为HandlerPoster处理调用注册方法设定了时间上限,也就是构造器中初始化的maxMillisInsideHandleMessage,在EventBus中初始化mainThreadPost时为其指定了10ms, 也就是while循环超过10ms之后会退出handleMessage方法,并将handleActive置位true,并再次发送消息,使得该handler再次调用handleMessage()方法,继续处理队列中的注册信息,这是为了避免队列过长是,while循环阻塞主线程造成卡顿。
以上就是HandlerPoster的全部代码,十分简短,通过Handler的方式将注册方法的调用派发到其他线程,在EventBus中只是将其用于在主线程中调用注册方法的情况。
2. BackgroundPoster
下面为BackgroundPoster的源码:
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
...
}
首先BackgroundPoster是继承自Runnable,是一个可执行的任务,这里看到BackgroundPoster中也有队列和标志位,其执行逻辑与HandlerPoster应该极为相似,下面看一下它的入队方法:
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
代码逻辑很简单,构建PendingPost,并在同步块中将该对象入队,然后调用线程池中的线程执行该Runnable,即在其他线程中执行run方法,进而调用注册方法,下面看run方法代码:
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
这里的while循环,除了没有设定时间上限之外,几乎与HandlerPoster中handleMessage方法中的处理逻辑完全一致,只不过标志位改成了executorRunning,其作用也几乎是一致的。但是这里需要注意有另外一个与时间有关的方法,即queue.poll(1000),这里有一个等待过程,即从队列中取PendingPost对象时,如果没有PendingPost会等待1000毫秒,其代码如下:
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
这里是典型的生产者和消费者模型,并且设定了时间上限。因此也就说明了BackgroundPoster会将1000毫秒以内入队的注册信息在同一个线程中调用,这一点也是与AsyncPoster最重要的区别。BackgroundPoster的代码也介绍完了,同样十分简单,下面继续分析AsyncPoster中的代码,下面你会看到,其实现更简单。
3. AsyncPoster
由于AsyncPoster的代码实现比较简单,这里一次性全部贴出它的代码:
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
AsyncPoster同样是继承自Runnable,但是可以看到这里没有了标志位和while循环,入队的逻辑就是构建PendingPost,然后加入队列,这里注意enqueue是同步的方法,不用担心同步问题,然后调用线程池的线程执行该Runnable,run方法的逻辑就是取出队列的pendingPost, 并执行invokeSubscriber方法,由于是入队一次调用一次execute方法,所以正常情况下队列中不会取不出pendingPost, 也不会剩余pendingPost。
这里AsyncPoster更为简单,但是重点是需要思考一下它与BackgroundPoster的区别,前面也提到过BackgroundPoster设定一个时间上限,并且在该段时间内入队的注册信息会在同一个线程中调用,而AsyncPoster则是完全异步的调用。也就是说BackgroundPoster会尽可能在同一线程中调用注册方法,但不保证,可是它保证有一定的顺序,也就是注册信息收集的顺序,而AsyncPoster对注册方法的调用则是完全异步,不确定在哪个线程,也不确定顺序。
至此,对于注册方法调用的线程调度分析源码已经分析完了,总结起来只是三个不同的Poster, HandlerPoster是继承自Handler, 通过handler机制可以将线程调度分派给主线程,而BackgroundPoster和AsyncPoster则是继承自Runnable,覆写run方法定义调用注册方法的任务,并有线程池负责调度启动该任务,进而完成对注册方法的调用。
通过三篇文章,对EventBus的源码大致分析了一遍,下图对分析的流程大致做一个总结,EventBus主要负责不同对象间传递消息,并且可以很方便地切换线程调用注册方法,EventBus的工作机制大概可以分为图中三部分,即注册,发送和调度三个步骤,其中构建部分主要是几个数据结构用来记录信息,后面三个则是三个步骤,图中总结了其中最主要的方法调用。
后记
EventBus可以很方便地在不同对象之间传递消息或者对象,并且很容易做到线程的切换,但是它也有它的局限性,就是不适用于多进程的应用,在多进程的情况下,EventBus中无论是单例还是同步方法以及同步块都会失效,所以便会失去了传递消息的作用。