上一篇:深扒 EventBus:register
PostingThreadState
在看 EventBus.post 源码之前,先了解一下 PostingThreadState 这个类是干什么的
public class EventBus {
/** 对于 ThreadLocal,设置(和获取多个值)要快得多。 */
/** For ThreadLocal, much faster to set (and get multiple values). */
// Posting Thread State 翻译:发送线程状态
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>(); // 事件队列
boolean isPosting; // 是否正在发送
boolean isMainThread; // 当前是否在主线程
Subscription subscription; // 这个包装着被订阅对象和它被订阅的方法
Object event; // 事件对象:EventBus.post(Object event)
boolean canceled; // //终止事件往下传递的标记:EventBus.cancelEventDelivery(Object event);
}
}
方法上面的注释已经说得很明白了,这个 PostingThreadState 类是用于放置到 ThreadLocal 中,接下来 的 EventBus.post 的源码可以清楚地看到
EventBus.post
多余的话就不说了,简单过一遍源码先
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
/** 将给定的事件发布到事件总线 */
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 这个线程必须没有事件正在发送
if (!postingState.isPosting) {
// 当前是否在主线程
postingState.isMainThread = isMainThread();
// 这个线程正在发送事件
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 遍历事件队列(其实只有一个事件)
while (!eventQueue.isEmpty()) {
// 让我们重点看一下这段代码
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 发送完事件了,重置一下 PostingThreadState 里面的对象
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
// 遍历整个事件队列(实际只有一个事件)
while (!eventQueue.isEmpty()) {
// 将这个事件移除掉
postSingleEvent(eventQueue.remove(0), postingState);
}
postSingleEvent
private final boolean eventInheritance; // event Inheritance 翻译:事件继承
// post Single Event 翻译: 发送单个事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 获取这个事件的类型
Class<?> eventClass = event.getClass();
// 是否找到了订阅的方法
boolean subscriptionFound = false;
if (eventInheritance) {
// 向上查找所有的超类(父类)的类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 让我们重点看一下这段代码
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// eventInheritance = false 表示忽略事件的超类(父类)
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 如果没有找到订阅的方法
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
// 打印日志:事件没有注册过这个订阅者
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
// sendNoSubscriberEvent 默认为 false,需要通过 EventBusBuilder 配置
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 发送一个没有找到订阅的事件
post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEventForEventType
// post Single Event For Event Type 翻译:为事件类型发布单个事件
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 让我们重点看一下这段代码的源码
postToSubscription(subscription, event, postingState.isMainThread);
// 标记这个事件的是否被拦截了,类似广播接收者
aborted = postingState.canceled;
} finally {
// 回收这个对象的属性
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
// 如果事件被拦截了,就直接跳出循环,终止这个事件的发送
if (aborted) {
break;
}
}
// 发送成功
return true;
}
// 发送失败
return false;
}
postToSubscription
// post To Subscription 翻译:发布订阅
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
// 表示该方法和消息发送方在同一个线程中执行
case POSTING:
// 直接执行
invokeSubscriber(subscription, event);
break;
// 表示这个方法在主线程中执行
case MAIN:
if (isMainThread) {
// 如果当前是在主线程就直接执行
invokeSubscriber(subscription, event);
} else {
// 调度到主线程队列中执行
mainThreadPoster.enqueue(subscription, event);
}
break;
// 和 MAIN 不同的是:事件始终排队等待。必须保证 post 调用是非阻塞的线程
case MAIN_ORDERED:
if (mainThreadPoster != null) {
// 调度到主线程队列中执行
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 表示该方法在后台执行,不能并发处理
case BACKGROUND:
if (isMainThread) {
// 调度到后台线程队列中执行
backgroundPoster.enqueue(subscription, event);
} else {
// 如果不是在主线程就直接执行
invokeSubscriber(subscription, event);
}
break;
// 表示在后台执行,可以异步并发处理
case ASYNC:
// 直接放到异步中执行
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
// invoke Subscriber 翻译: 调用订阅者
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 反射执行订阅的方法,将事件对象作为参数传入
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
post 源码总结
看完了 EventBus.post 源码,总体感觉源码还是写得非常不错,方法之间的解耦很好,代码没有嵌套得很深,注释也很完善,方法命名十分人性化。现在总结一下它的实现步骤,涉及到的类可能比较多,建议你在看这个总结的时候边看源码边理解
EventBus 创建了一个 ThreadLocal<PostingThreadState> 对象,还不知道 ThreadLocal 请自行百度,简单点说就是一个 Map 集合,以当前线程名称作为 key,而 value 就是 ThreadLocal 的泛型 PostingThreadState,这个对象是用于记录当前线程是不是主线程,有没有事件正在发送,事件对象,事件是否被拦截了等等信息,需要注意的是 EventBus.post 调用前如果当前线程已经有事件正在发送了,则这个事件会被忽略掉,所以不要在被订阅的方法中往同个线程再发一次事件,否则是无法接收到这个事件的,这个就是看源码带给我们的好处,让我们避免使用上的误区
post 方法就是将事件放到 PostingThreadState 对象的事件队列中,然后再搞个循环发送队列中的事件(队列里面只有一个事件还要整个 while 循环,整得那么高大上),循环是调用了 postSingleEvent 方法,它是用来专门发送单个事件的
既然要发送单个事件,那是不是得先找到订阅这个事件的所有方法,那是肯定的,如果你想用某一样东西的,是不是得找一下它在哪里,找到了才能用,EventBus 也是一样,它会从事件类型的集合中找,看看有多少个方法订阅了这个事件,然后一个个执行,我们发现一个问题,APT 插件只能解决 EventBus.register 获取方法的时候可以不用反射获取,可以直接通过自动生成的 MyEventBusIndex 索引类获取,但是到了 EventBus.post 中,不能避免它去反射执行这个方法,因为除此之外并无它法,这个可能就是 EventBus 的一个硬伤吧
线程调度
EventBus.post 最后调用了 postToSubscription,再看一次源码
private final Poster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
让我们看 Poster 这个类,哦不,这是一个接口
/**
* 发送事件
* Posts events.
*
* @author William Ferguson
*/
interface Poster {
/**
* 为特定的订阅加入要发布的事件队列。
*
* @param 订阅将接收事件的订阅。
* @param 事件,该事件将发布到订阅服务器。
*/
/**
* Enqueue an event to be posted for a particular subscription.
*
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
}
如果我猜得没错,Ctrl + T
果真,我真的没猜错
Poster mainThreadPoster; // 具体类型为 HandlerPoster
BackgroundPoster backgroundPoster;
AsyncPoster asyncPoster;
PendingPost
啥?上面三个类你都不讲了?No,我们再先来一碗开胃菜
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
你给我看这个干嘛,我又看不懂,老规矩,精简一波
final class PendingPost {
Object event; // 事件对象
Subscription subscription; // 订阅信息(包含订阅对象、订阅方法)
PendingPost next; // 使用了链表结构
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
}
你以为真的精简了,其实被精简的部分也要看看,这里不难发现,这个 PendingPost 类跟之前讲过的 SubscriberMethod 类一样用了享元设计模式,建立对象缓存池,然后进行复用,代码很简单,这里不再细说
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPostQueue
老哥你逗我呢,到底讲不讲那三个类,不讲我可要走了
老哥莫急,看完这个类咱们就看,虽然这个 PendingPostQueue 带了 Queue,但是并没有实现 Queue 队列的接口,还是有点看头的
// Pending Post Queue 的翻译:等待发送队列
final class PendingPostQueue {
// 队列头部
private PendingPost head;
// 队列尾部
private PendingPost tail;
// 入列(添加 PendingPost)
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
// 出列(移除 PendingPost)
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
// 这个是 Object 类的方法,等待一段时间后执行,调用这个方法后会释放锁
wait(maxMillisToWait);
}
return poll();
}
}
HandlerPoster
// 这个类继承 Handler,你应该懂我的意思吧?
public class HandlerPoster extends Handler implements Poster {
// PendingPost 队列
private final PendingPostQueue queue;
// 方法最大的执行时间,查看源码得知给的是 10
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
// 当前是否正在处理事件
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 这个刚刚讲过,从对象缓存池复用一个 PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 由于这个方法是被外部调用,所以需要处理线程安全的问题
synchronized (this) {
// 加入队列
queue.enqueue(pendingPost);
if (!handlerActive) {
// 标记当前正在处理事件
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
// 开个循环
while (true) {
// 循环就在这了,不断从队列中取数据,直到队列中没有数据了为止
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// 双重校验
pendingPost = queue.poll();
if (pendingPost == null) {
// 标记事件处理完毕
handlerActive = false;
return;
}
}
}
// 反射执行这个订阅的方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
BackgroundPoster
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
// 有没有任务正在执行
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 从对象缓存池复用一个 PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将这个 PendingPost 加入到队列中
queue.enqueue(pendingPost);
// 标记线程池正在执行任务
if (!executorRunning) {
executorRunning = true;
// 使用线程池执行这个任务(划重点)
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
// 开个循环
while (true) {
// 等个 1 秒后才再从队列中获取
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// 双重校验
pendingPost = queue.poll();
if (pendingPost == null) {
// 没有任务正在执行
executorRunning = false;
return;
}
}
}
// 反射执行这个订阅的方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
// 没有任务正在执行
executorRunning = false;
}
}
}
AsyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 从对象缓存池复用一个 PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 将这个 PendingPost 加入到队列中
queue.enqueue(pendingPost);
// 使用线程池执行这个任务(划重点)
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
// 从队列中获取 PendingPost 对象
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
// 反射执行这个订阅的方法
eventBus.invokeSubscriber(pendingPost);
}
}
BackgroundPoster 和 AsyncPoster 区别
BackgroundPoster 是在后台执行,AsyncPoster是在异步执行,那么这两个到底有啥区别呢,咱们看源码不能白看,得会区分,得会总结
现在看过源码后,会发现 BackgroundPoster 和 AsyncPoster 的代码基本差不多,只不过相比 BackgroundPoster 的代码,而 AsyncPoster 的代码少了很多,那么我们把 BackgroundPoster 有的而 AsyncPoster 没有的代码单独拎出来看看
final class BackgroundPoster implements Runnable, Poster {
......
private volatile boolean executorRunning;
.......
public void enqueue(Subscription subscription, Object event) {
.......
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
你确定这是精简了?yes,谁让这个 AsyncPoster 类的代码本身就很少呢,先说多出来的一个字段吧
// 刚刚看过源码了,这个是记录线程池有没有任务正在执行的标记
private volatile boolean executorRunning;
看一下 enqueue 方法的源码
public void enqueue(Subscription subscription, Object event) {
.......
// 同步锁
synchronized (this) {
// 添加任务
queue.enqueue(pendingPost);
// 当前必须没有任务正在执行
if (!executorRunning) {
// 标记当前有任务正在执行
executorRunning = true;
// 使用线程池执行任务
eventBus.getExecutorService().execute(this);
}
}
}
再看一下 run 方法的源码
// 搞个循环干啥?
while (true) {
// 从队列中等待 1 秒后再把数据取出来
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// 再次检查,这次是同步的
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 标记当前没有任务正在执行
executorRunning = false;
return;
}
}
}
// 反射执行方法
eventBus.invokeSubscriber(pendingPost);
}
我们可以很直观地看出,BackgroundPoster 处理了线程安全的问题,还有另外一个是 BackgroundPoster.enqueue 会先判断当前线程池没有任务之后再执行,那么这样做的话会不会丢失任务呢?其实不然,BackgroundPoster.run 会进行循环遍历,每隔一秒遍历一次队列,直到没有消息了为止。这样就能有效地保证线程池里面的任务执行顺序是有序的,AsyncPoster 同样也是用到了线程池但是没有使用同步锁,所以 AsyncPoster 线程池中的任务执行是没有顺序的。
invokeSubscriber
这三个 Poster 最后还是回调的是 EventBus.invokeSubscriber 方法,让我们看看到底是啥
void invokeSubscriber(PendingPost pendingPost) {
// 事件对象
Object event = pendingPost.event;
// 订阅的对象
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
// 调用订阅者
invokeSubscriber(subscription, event);
}
}
// 这个方法在 EventBus.post 源码解析中已经讲过了,这里不再细说
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 反射执行方法,传入:订阅的对象、事件对象
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
你会发现万变不离其宗,调来调去最后还是会调到 EventBus.invokeSubscriber 方法中,那些线程调度无非都是基于 Handler 和线程池实现