1: 自定义注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface MessageSubscribeHandler {
@AliasFor("topic")
String value() default "";
@AliasFor("value")
String topic() default "";
}
2: 定义处理器函数接口, 用户处理MQTT消息
@FunctionalInterface
public interface Handler {
/**
* Something has happened, so handle it.
*
* @param mqttMessage the eventMessage to handle
*/
void handle(MqttPublishMessage mqttMessage);
}
3: 函数接口实现:
@MessageSubscribeHandler("/+/+/event/+")
@Component
public class EventMessageHandler implements Handler {
@Override
public void handle(MqttPublishMessage mqttMessage) {
log.debug("注册事件消息处理器");
}
}
4: 消息总线定义
public interface MessageBus {
//注册不同topic消息处理器
void registerHandler(String topic, Handler handler);
}
5: 消息总线实现:
分布式消息总线可基于分布式的消息中间件(如emqx等)实现, 不建议使用单机版的消息总线,具体实现省略
6:扫描代码中的事件处理器, 并注册到消息总线中
@Slf4j
@Component
public class EventBusLogicRegisterProvider implements EventBusLogicRegister {
@Override
public void registerTopics(MessageBus messageBus) throws Exception {
ResourceLoader resourceLoader = new PathMatchingResourcePatternResolver();
ResourcePatternResolver resolver = ResourcePatternUtils.getResourcePatternResolver(resourceLoader);
MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourceLoader);
String path = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX
.concat(ClassUtils.convertClassNameToResourcePath("org.lbb.demo")
.concat("/**/*.class"));
for (Resource resource: resolver.getResources(path)) {
if (resource.isReadable()) {
MetadataReader reader = readerFactory.getMetadataReader(resource);
ClassMetadata classMetadata = reader.getClassMetadata();
if (!classMetadata.isInterface()) {
Class handlerClass = ClassUtils.forName(classMetadata.getClassName(), null);
if (handlerClass.getAnnotation(MessageSubscribeHandler.class) != null) {
MessageSubscribeHandler messageSubscribeHandler = AnnotationUtils.findAnnotation(handlerClass, MessageSubscribeHandler.class);
String topic = Optional.of(messageSubscribeHandler.topic()).orElseThrow(() ->new NullPointerException("topic不能为空!"));
Handler handler = (Handler) SpringContextUtils.getBean(handlerClass);
messageBus.registerHandler(topic, handler);
}
}
}
}
}
}
7: 系统启动时自动注册消息处理器
@Configuration
@Slf4j
public class MessageRegisterConfiguration implements CommandLineRunner {
@Autowired
private MessageBus messageBus;
@Autowired
private EventBusLogicRegisterProvider eventBusLogicRegisterProvider;
@Override
public void run(String... args) throws Exception {
eventBusLogicRegisterProvider.registerTopics(messageBus);
}
}