开篇
- EventBus是Guava的事件处理机制,是设计模式中的观察者模式的优雅实现。对于事件监听和发布订阅模式,可以使用EventBus完美的解决。这篇文章主要是想了解下EventBus底层的实现逻辑,做到使用的时候更加游刃有余。
-
EventBus的整体使用方式如下图所示,基于Event的驱动来实现Publisher和Subscriber之前的通信。
EventBus用法
public class GuavaTest {
public static void main(String[] args) {
// 1、定义EventBus对象
EventBus eventBus = new EventBus();
// 2、定义两个观察者对象
DataObserver1 observer1 = new DataObserver1();
DataObserver2 observer2 = new DataObserver2();
// 3、注册两个观察者
eventBus.register(observer1);
eventBus.register(observer2);
// 4、分发事件
eventBus.post(123);
eventBus.post("hello world");
}
}
// 定义观察者一
class DataObserver1 {
@Subscribe
public void action(Integer msg) {
System.out.println("DataObserver1 String msg: " + msg);
}
@Subscribe
public void action(String msg) {
System.out.println("DataObserver1 String msg: " + msg);
}
}
// 定义观察者二
class DataObserver2 {
@Subscribe
public void action(Integer msg) {
System.out.println("DataObserver2 String msg: " + msg);
}
@Subscribe
public void action(String msg) {
System.out.println("DataObserver2 String msg: " + msg);
}
}
- 1、EventBus的核心对象包含EventBus对象、观察者对象Observer、事件对象Event。
- 2、观察者对象通过注解@Subscribe来定义处理事件的逻辑实现方法,该方法的参数定义了事件类型。
- 3、通过EventBus的register方法实现观察者Objserver和EventBus的关联。
- 4、通过EventBus的post方法实现事件Event的分发。
EventBus定义
public class EventBus {
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
// SubscriberRegistry用来保存监听者的信息
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
// dispatcher使用的是PerThreadQueuedDispatcher
private final Dispatcher dispatcher;
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
public void register(Object object) {
subscribers.register(object);
}
}
- EventBus的核心变量subscribers用来保存订阅者, Dispatcher用来定义分发的方法。
- EventBus的注册方法register就是把观察者注册到subscribers当中。
EventBus订阅过程
final class SubscriberRegistry {
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
void register(Object listener) {
// 获取监听者内部对应的@Subscribe注解的方法并解析成事件类型维度的Multimap,
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
// 事件监听信息保存在subscribers当中,以事件类型为维度
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
// 解析监听者对象
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
// 获取监听的类型实现
Class<?> clazz = listener.getClass();
// 获取指定@Subscribe注解的方法
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
// 按照@Subscribe注解指定方法的参数为维度进行组织
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
}
- EventBus的subscribers用来保存观察者。
- EventBus的注册过程核心逻辑包含发现观察者解析事件方法和保存观察者。
- findAllSubscribers解析包含@Subscribe注解的方法生成事件类型为key,Subscriber对象为value的map对象。
- Subscriber保存了观察者对象以及执行的方法,用来分发的时候进行回调操作。
EventBus分发过程
public class EventBus {
public void post(Object event) {
// 获取事件的监听者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
// 遍历监听者进行事件的dispatch
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
}
- Event的分发核心以分发的事件event去查找对应的观察者进行分发,分发的过程就是遍历观察者进行分发的过程。
- PerThreadQueuedDispatcher支持的线程维度的分发,内部通过queue做了中转进行消息的分发。
- dispatchEvent是执行Subscriber的回调方法的真正逻辑。
EventBus通知过程
class Subscriber {
@Weak private EventBus bus;
@VisibleForTesting final Object target;
private final Method method;
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = checkNotNull(target);
this.method = method;
method.setAccessible(true);
this.executor = bus.executor();
}
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
} catch (IllegalAccessException e) {
} catch (InvocationTargetException e) {
}
}
}
- 回调的过程就是执行invokeSubscriberMethod方法,内部就是常见的method.invoke反射进行回调实现。