前言
日常开发中经常遇到一个业务发生之后需要触发好几个业务点,比如订单支付完成之后需要发送短信、送会员积分、发送优惠券等。在分布式系统中我们可以通过消息队列实现,各个系统之间订阅支付成功事件,然后实现各自的业务,达到一个系统之间的解耦和异步的目的。
如果在同一个进程中也存在类似的通知需求,通过消息队列显得太笨重而且也没有跨进程或者系统架构中都没引入消息队列。这时候要实现进程内的消息通讯就可以通过Spring自带的Event事件或者google的EventBus。
观察者模式
不管是SpringEvent还是EventBus都是对观察者模式的实现。与传统的观察者模式不同的是,它是观察者模式的非显示实现,说白了就是通过第三方将消息发布者与订阅者解耦。
传统的观察的模式需要在消息发布方维护一个订阅者的队列,耦合性是比较强的。而SpringEvent和EventBus是通过自身来管理发布者与订阅者的关系,发布者不再关心有多少订阅者,达到一个解耦的效果。
-
传统观察者模式
如上图传统的观察者模式,事件发布方需要自身维护和监听者的关系,这样做耦合性比较高。
-
消息订阅
采用这种形式,事件发布者和监听者通过第三方来管理它们之间的对应关系,这样不要显示的去订阅和发布,达到一个解耦的效果。
SpringEvent
使用
- 事件
定义一个消息事件非常简单,继承ApplicationEvent就可以。
public class SpringObjectEvent extends ApplicationEvent {
private String msg;
public SpringObjectEvent(Object source) {
super(source);
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 发布
通过ApplicationContext发布事件
@Resource
private ApplicationContext applicationContext;
public void publishSpringEvent(String msg){
SpringObjectEvent springObjectEvent = new SpringObjectEvent(this);
springObjectEvent.setMsg(msg);
applicationContext.publishEvent(springObjectEvent);
}
- 订阅
订阅消息有两种方式一个是通过继承另外一个是通过注解的方式。
继承ApplicationListener
@Component
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {
@Override
public void onApplicationEvent(SpringObjectEvent event) {
System.out.println("Object:"+event.getMsg());
}
}
注解 @EventListener
@Component
public class SpringAnnotationEventListener {
@EventListener
public void processSpringObjectEvent(SpringObjectEvent springObjectEvent){
System.out.println("Annotation:"+springObjectEvent.getMsg());
}
}
原理分析
本着知其然也知其所以然的原则,简单分析一下SpringEvent的原理。
监听加载
第一个问题通过继承或者注解定义的观察者是什么时候添加到spring中去的。
这时候就需要找到spring的经典方法org.springframework.context.support.AbstractApplicationContext#refresh
,这方法在spring容器加载或者刷新的时候将会被调用,如果要分析Spring的源码这个方法是必须要看的。本文主要研究SpringEvent,这里也不会去详细分析这个方法,只分析其中的两步
// 创建消息分发通知器
initApplicationEventMulticaster();
......
// 注册监听器
registerListeners();
initApplicationEventMulticaster()
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//如果通知器已经存在,给applicationEventMulticaster做一个简单的赋值
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
//不存在则创建一个SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
整体流程很简单如果applicationEventMulticaster在Spring容器中已经存在则做一个赋值操作,如果不存在就创建一个SimpleApplicationEventMulticaster。
registerListeners()
protected void registerListeners() {
// 首先添加静态指定的listener
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
//将继承自ApplicationListener接口的bean加载到通知器中
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// 开始发布一些早期的事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
这个方法主要就是将定义的listener加载到通知器中,有两个来源:一是Spring内置的一些监听器,二是继承ApplicationListener的listener。最后一步是将一些预存的事件通知出去。
在这个方法里面其实是没有将通过@EventListener注解定义的listener加载进去的。那它是在什么时候加载进去的呢?找到类EventListenerMethodProcessor,看命名就知道是在这里做的处理,简单分析一下。
EventListenerMethodProcessor中有个关键方法afterSingletonsInstantiated()
,在bean实例化之后可以做一个切入,跟进去找到方法processBean
简化流程
//带有@EventListener的方法集合
Map<Method, EventListener> annotatedMethods = null;
//遍历集合
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
//创建listener为ApplicationListenerMethodAdapter
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
//初始化listener
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
//添加listener到ioc容器中
context.addApplicationListener(applicationListener);
break;
}
}
}
上诉流程可以知道,spring在单例的bean实例化之后做了一个切入,将类中所有带@EventListener注解的方法转换为ApplicationListenerMethodAdapter(继承自ApplicationListener),然后添加到容器中。
listener加载到这里分析完了,做一个简单的总结
- 在容器初始化的时候加载listener
- 如果applicationEventMulticaster不存在则创建applicationEventMulticaster
- 添加listener,包含内置的listener,继承ApplicationListener接口的listener,带有@EventListener注解的方法。
通知
void publishEvent(Object event)
找到具体的实现org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType)
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
介绍流程之前,先说明SimpleApplicationEventMulticaster的两个类变量
//调用监听器的执行器
private Executor taskExecutor;
//调用监听器的异常处理机制
private ErrorHandler errorHandler;
事件通知流程非常的简单,获取当前事件监听的listener,如果执行器不为空就交个执行器执行,否则交给调用线程执行。这里的流程很简单,就不具体分析了,这里有个效率优化的地方就是获取当前事件监听的listener时,其实是维护了一个ConcurrentHashMap作为缓存,避免每次通知都遍历所有的listener。
同步异步
通过上面的源码分析可以知道SpringEvent默认是同步实现,如果要实现异步通知两种方式
- 监听者异步
public class SpringObjectEventListener implements ApplicationListener<SpringObjectEvent> {
//添加异步注解
@Async
@Override
public void onApplicationEvent(SpringObjectEvent event) {
System.out.println("Object:"+event.getMsg());
}
}
- 自定义applicationEventMulticaster
前面源码分析当容器中不存在applicationEventMulticaster才会创建一个默认的SimpleApplicationEventMulticaster,而且SimpleApplicationEventMulticaster默认taskExecutor为null,所以我们可以定义一个SimpleApplicationEventMulticaster并将taskExecutor设置一个执行listener的线程池,从而达到异步执行的效果。
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster createApplicationEventMulticaster(){
SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
//创建线程池
DefaultThreadFactory threadFactory = new DefaultThreadFactory("listener-executor", false);
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), threadFactory);
applicationEventMulticaster.setTaskExecutor(executor);
return applicationEventMulticaster;
}
EventBus
使用
- 引入
EventBus在google的guava包中,通过maven引入。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
- 事件
EventBus的事件定义只要是Object类型就行
public class EventBusEvent {
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 初始化
EventBus eventBus = new EventBus();
- 发布
EventBusEvent eventBusEvent = new EventBusEvent();
eventBusEvent.setMsg(msg);
eventBus.post(eventBusEvent);
- 订阅
定于一个事件需要在方法上添加@Subscribe注解,然后再注册到eventBus中
public class EventBusListener {
@Subscribe
public void subscribe(EventBusEvent e) {
System.out.println(e.getMsg());
}
}
eventBus.register(new EventBusListener());
原理分析
在具体分析源码之前,我们通过已经分析过的SpringEvent的一个基本原理,参考EventBus的一个使用流程。在不分析源码的情况下,去猜测它的一个实现原理是怎么样的。当一个监听者注册到EventBus中的时候,是不是会将这个类上带有@Subscribe的方法都当做一个监听者,然后有一个数据结构去保存事件和监听者的一个关联关系。
然后通过EventBus发布一个事件的时候,通过内部保存的一个事件与监听者的管理关系,就可以找到监听者,然后就可以对其通知了呢?
接下来简单分析一下EventBus的源码,看对它的猜测对不对。
监听加载
注册监听器
public void register(Object object) {
subscribers.register(object);
}
这里的subscribers就是SubscriberRegistry
先看SubscriberRegistry的成员变量subscribers
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
可以知道EventBus是通过一个ConcurrentMap保存事件与监听者的关系的,监听者是读多写少的创建所以用一个CopyOnWriteArraySet存储。
register
void register(Object listener) {
//查找带有@Subscribe的方法、并封装成Subscriber
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
//遍历集合,将监听器添加到subscribers中
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
上面流程比较简单清晰,和我们预先分析的差不多,先解析带有@Subscriber注解的方法,然后将这些监听器添加到一个Map中,这个Map用来保存事件与监听者的关系。
通知
public void post(Object event) {
//获取事件的监听者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//分发事件
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
上诉流程也很简单,从subscribers获取该事件的监听者,获取方式就是map.get(Object key)。如果有监听者则开始分发事情,如果又没有监听者也不是DeadEvent,将会把该事件封装一个DeadEvent然后再调用post方法。
这里需要分析一下dispatcher事件分发器。
/** Dispatches the given {@code event} to the given {@code subscribers}. */
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
这是一个抽象方法,一共有三个实现类ImmediateDispatcher、PerThreadQueuedDispatcher、LegacyAsyncDispatcher。
- ImmediateDispatcher:直接遍历subscribers,并且立即执行。
- PerThreadQueuedDispatcher:(EventBus默认的分发器)内部用ThreadLocal为每个调用线程维护了一个队列,保证各个调用线程的subscribers执行的有序性。主要流程为先把subscribers加入队列,然后再从队列中取出执行,保证监听者在所有调用线程执行的顺序性。
- LegacyAsyncDispatcher:传统异步分发器AsyncEventBus(AsyncEventBus默认的分发器),内部通过一个全局的queue去保存subscribers。执行流程与PerThreadQueuedDispatcher差不多,subscribers加入队列,然后再从队列中取出执行,区别在于PerThreadQueuedDispatcher为每个调用线程都维护一个队列,而这里是所有线程共享一个全局队列。这不过这里的任务执行是异步执行的,所以其实这个queue并没有保证subscribers执行的总体有序性,而是尽量去保证一个执行顺序。
同步异步
EventBus默认是同步的,要构造异步的通知使用AsyncEventBus,传入任务执行的线程池就可以了。
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
总结
分析了上面两种关于进程内的通讯组件,其实有很多相同的设计思路。如果现在让你自己实现一个类似的组件相信也可以轻松的实现了。那如果是实现进程之间的消息队列呢?一个可持久化的消息队列、一个高可用的消息队列、一个高吞吐的消息队列......这时候就需要去学习Kafka、RocketMQ才能回答这个问题了。
对比了SpringEvent与EventBus,感觉两个的功能都差不多,都能实现同步与异步,实现消息通知也能简单。但是个人觉得如果项目中使用了Spring,还是使用Spring自带的通知机制比较好,不用引入第三方的包,而且idea也自带快捷键查找消息的订阅者,会比较方便。
大家这做项目中如果需要一个业务的触发点需要多个地方去处理的情况下,不妨考虑这种实现方式。