Spring事件原理
spring的事件监听由三部分组成
事件(ApplicationEvent)
描述发生事情的对象,比如一个订单完成事件发布器(ApplicationEventMulticaster)
对应于观察者模式中的被观察者/主题, 负责通知观察者(监听器) 对外提供发布事件和增删事件监听器的接口,维护事件和事件监听器之间的映射关系,并在事件发生时负责通知相关监听器。监听器(ApplicationListener)
对应于观察者模式中的观察者。监听器监听特定事件,并在内部定义了onApplicationEvent方法实现事件发生后的响应逻辑。
Sping事件体系使用的是观察者模式。ApplicationListener是观察者(监听者),其中定义了 onApplicationEvent 方法实现对 ApplicationEvent 事件的处理逻辑。除了发布者和监听者之外,还有一个EventMultiCaster负责把事件转发给监听者。 puhlisher -> EventMulticaster -> Listener
Spring在ApplicationContext接口类的抽象实现类AbstractApplicationContext 的容器启动方法 refresh( ) 中完成了 spring 事件体系的搭建。AbstractApplicationContext拥有一个applicationEventMulticaster成员变量,applicationEventMulticaster提供了容器监听器的注册表。
Spring 发布订阅实现
简单实现
定义订单类
public class Order {
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
定义订单事件类, 继承 ApplicationEvent 类。
public class OrderEvent extends ApplicationEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
public OrderEvent(Object source, String name) {
super(source);
this.name = name;
}
public String getName() {
return name;
}
}
定义订单事件处理类, 需实现 ApplicationListener 接口类, 重写 onApplicationEvent 方法。
@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
if("orderListener".equals(event.getName())){
System.out.println("order exector");
}
}
}
编写测试用例
@org.junit.jupiter.api.Test
public void test() throws Exception{
Order order = new Order();
order.setId(1);
System.out.println("MainThread Begin .....");
applicationContext.publishEvent(new OrderEvent(order,"orderListener"));
System.out.println("MainThread End .....");
}
进阶实现
创建发布主题枚举类,需要发布不同主体的消息时,只需要在枚举类中新增一个枚举值,将其发布到spring上下文中。
public enum EventTopic {
ORDER,
TEST,
;
}
自定义事件,实现ApplicationEvent接口, 内置主题成员变量,用以区分发布主题。
import org.springframework.context.ApplicationEvent;
import java.io.Serializable;
/**
* 自定义事件,实现ApplicationEvent接口
*/
public class CustomeEvent<T> extends ApplicationEvent implements Serializable {
private static final long serialVersionUID = 1L;
private EventTopic topic;
public CustomeEvent(EventTopic topic, T source) {
super(source);
this.topic = topic;
}
public EventTopic getTopic() {
return topic;
}
}
使用工厂模式 + 策略模式使得发布主题自行分发到订阅者并执行事件逻辑,执行方法采用异步处理方式,如此便不会阻塞主线程。如果有新增发布主题,那么只需适配一个主题执行者就能实现发布订阅逻辑。
public interface EventHandler {
EventTopic getTopic();
@Async
void exec(CustomeEvent event);
}
创建工厂类,加载所有订阅者到工厂类缓存,通过发布主题适配订阅者
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class ExectorFactory implements InitializingBean, ApplicationContextAware {
private static final Map<String, List<EventHandler>> SREAREGY_MAP = new HashMap<>(32);
private ApplicationContext appContext;
public List<EventHandler> getHandler(EventTopic topic) {
return SREAREGY_MAP.get(topic.name());
}
@Override
public void afterPropertiesSet() throws Exception {
appContext.getBeansOfType(EventHandler.class)
.values()
.forEach(handler -> {
List<EventHandler> handlers = SREAREGY_MAP.get(handler.getTopic().name());
handlers = Objects.isNull(handlers) ? new ArrayList<EventHandler>() : handlers;
handlers.add(handler);
SREAREGY_MAP.put(handler.getTopic().name(), handlers);
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}
实现订阅者逻辑,其中同一主题,订阅者可以有多个,只需限定实现类中的主题相同即可,即:getTopic() 方法中返回同一主题。
@Component
public class OrderEexector implements EventHandler {
@Override
public EventTopic getTopic() {
return EventTopic.ORDER;
}
@Override
public void exec(CustomeEvent event) {
System.out.println("order exector");
}
}
需要多个订阅者时,增加一个相同主题的订单处理类2
@Component
public class Order2Eexector implements EventHandler {
@Override
public EventTopic getTopic() {
return EventTopic.ORDER;
}
@Override
public void exec(CustomeEvent event) {
try{
Thread.sleep(5000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("order2 exector");
}
}
定义Spring 事件的监听类,通过订阅者工厂类完成主题的自动分发。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
@Component
public class CustomeEventListener implements ApplicationListener<CustomeEvent> {
private static final Logger log = LoggerFactory.getLogger(CustomeEventListener.class);
@Autowired
ApplicationContext context;
@Override
public void onApplicationEvent(CustomeEvent event) {
ExectorFactory exector = context.getBean(ExectorFactory.class);
List<EventHandler> handlers = exector.getHandler(event.getTopic());
if (Objects.isNull(handlers)) {
log.warn("no exector for this topic: " + event.getTopic().name());
return;
}
handlers.forEach(h -> {
h.exec(event);
});
}
}
编写 junit 测试方法
@org.junit.jupiter.api.Test
public void test3() throws Exception{
Order order = new Order();
order.setId(1);
System.out.println("MainThread Begin .....");
applicationContext.publishEvent(new CustomeEvent<Order>(EventTopic.ORDER, order));
applicationContext.publishEvent(new CustomeEvent<Order>(EventTopic.TEST, order));
System.out.println("MainThread End .....");
}
运行结果如下
思考
1、我们实现了两个订单类的处理逻辑,但是为什么只打印了一个逻辑呢? 正确打印结果应该如下图所示才对吧,那么如何才能实现呢?
答:注意观察订单逻辑处理类,我们发现其中一个订单逻辑中,线程休眠了5秒,主线程在子线程休眠的过程中退出了,所以我们看不到订单逻辑处理类2的打印信息。那么解决方案就很明显了,我们只需要使得主线程在子线程执行完之后再退出就好了,主线程中休眠7秒等待子线程结束。
@org.junit.jupiter.api.Test
public void test3() throws Exception{
Order order = new Order();
order.setId(1);
System.out.println("MainThread Begin .....");
applicationContext.publishEvent(new CustomeEvent<Order>(EventTopic.ORDER, order));
applicationContext.publishEvent(new CustomeEvent<Order>(EventTopic.TEST, order));
try{
Thread.sleep(7000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("MainThread End .....");
}