上节内容:EventBus注册方面的底层逻辑 https://www.jianshu.com/p/e324c8353fe8
本篇咱们讲发送的时候走的底层逻辑,开工。
首先咱们发送的调用代码是
EventBus.getDefault().post(obj)
来,去看看post(obj)
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
1 PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
2 postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
1:这里简单介绍一下currentPostingThreadState,ThreadLocal 是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,而这段数据是不会与其他线程共享的。而他包裹着的PostingThreadState 对象,则是一个简单的封装类,里面封装了一些发送时所需要的数据而已,ThreadLocal 值得深入理解,特别是有多线程业务的筒子。
2:遍历事件队列,处理单个事件类型的消息发送(顺便把自己在eventQueue队列里移除)
走起,进入postSingleEvent(eventQueue.remove(0), postingState)
eventInheritance = builder.eventInheritance;
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
1 if (eventInheritance) {
2 List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
3 subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
3 subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
1:eventInheritance 的值是在Builder定义的,默认为true,意思为向上继续查找事件的父类
2:查询出当前事件类型对应的class集合(父类,接口,父类的接口,父类接口的父类等等),所以比如订阅了两个event方法参数分别是object1、object2(1的父类),post的时候传入的是object1,那么两个订阅方法是都能收到消息通知的
3:调用postSingleEventForEventType方法继续处理事件
好,postSingleEventForEventType(event, postingState, clazz)
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
1 subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
2 postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
1:根据事件类型获取到订阅者集合(还记得注册时的subscriptionsByEventType对象么,就是那个关系是一对多的),在这里我们把订阅者集合subscriptions 拿了出来,并且遍历。
2:走postToSubscription()方法,要去消息发送的地方了
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
1 switch (subscription.subscriberMethod.threadMode) {
case POSTING:
2 invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
3 mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
1:根据事件的线程模式执行不同的方法,可以看到这里有好几种,归类出来就是
相同线程时执行invokeSubscriber(subscription, event)
不同线程时:1(主线程发出)执行mainThreadPoster.enqueue(subscription, event)
2(子线程发出)执行backgroundPoster.enqueue(subscription, event)
不管3721都是子线程发出执行asyncPoster.enqueue(subscription, event)
我们看一下invokeSubscriber方法
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
利用反射的方法将event传了出去,这里的event就是你一开始post的时候的参数,subscriber是订阅者,所以你外面的订阅方法就收到消息了
那mainThreadPoster.enqueue(subscription, event)呢,首先我们看一下mainThreadPoster是个什么东西
EventBus(EventBusBuilder builder) {
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
}
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}
这个对象是一开始就实例化出来了,new了一个HandlerPoster,我们看一看HandlerPoster类
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
1 queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
2 PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
3 eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
嘿嘿,看到熟悉的Handler了吧,是的,为啥能切换到主线程发送消息,就是用的
1:先在外面调用enque()方法,将订阅者跟事件封装成PendingPost对象并存进队列里。
2:sendMessage后在handleMessage遍历队列。
3:调用eventBus.invokeSubscriber(pendingPost)
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
拿出了事件类型跟订阅者,释放PendingPost对象,最后调用invokeSubscriber(subscription, event)方法
是不是有点眼熟,就是那个利用反射发送出消息的方法,所以只是饶了个弯,最后还是回到了invokeSubscriber(subscription, event)方法。
至于还有两种模式
backgroundPoster.enqueue(subscription, event);
asyncPoster.enqueue(subscription, event);
跟这个是一个套路的,只是他们分别实现了Runnable而不是Handler,然后加入了线程池、之后去执行方法。还是贴一下代码吧。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
Ok,那整体的一个发送逻辑就是这样了,我们总结一下:
1 通过调用post方法传入参数
2 将事件放入队列里,然后遍历队列,挨个事件处理
3 查询出事件类型的父类以及接口之类的一切存放进集合
4 遍历事件集合,获取订阅者集合
4 遍历订阅者集合,根据线程模式的不同执行相应的逻辑(利用Handler或Runnable实现线程切换)
5 利用反射的原理将传入的事件发送到对应的订阅者
6 发送结束
好了,下一篇将注销。