序
esper是一个比较经典的CEP(Complex Event Processing
)的开源实现(开源协议为GPL v2
),这里简单介绍下如何在springboot中使用。
maven
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
<version>6.1.0</version>
</dependency>
config
@Bean
public EPServiceProvider epServiceProvider() {
com.espertech.esper.client.Configuration config = new com.espertech.esper.client.Configuration();
//add event type
config.addEventType(PersonEvent.class);
EPServiceProvider epServiceProvider = EPServiceProviderManager.getDefaultProvider(config);
// epServiceProvider.initialize();
return epServiceProvider;
}
@Bean
public EPAdministrator epAdministrator() {
return epServiceProvider().getEPAdministrator();
}
旧版的话,需要epServiceProvider.initialize();新版已经不用了,在esper-6.1.0-sources.jar!/com/espertech/esper/client/EPServiceProviderManager.java已经初始化了
public static EPServiceProvider getProvider(String providerURI, Configuration configuration) throws ConfigurationException {
String providerURINonNull = (providerURI == null) ? EPServiceProviderSPI.DEFAULT_ENGINE_URI : providerURI;
if (runtimes.containsKey(providerURINonNull)) {
EPServiceProviderSPI provider = runtimes.get(providerURINonNull);
if (provider.isDestroyed()) {
provider = getProviderInternal(configuration, providerURINonNull);
runtimes.put(providerURINonNull, provider);
} else {
provider.setConfiguration(configuration);
}
return provider;
}
// New runtime
EPServiceProviderSPI runtime = getProviderInternal(configuration, providerURINonNull);
runtimes.put(providerURINonNull, runtime);
runtime.postInitialize();
return runtime;
}
在esper-6.1.0-sources.jar!/com/espertech/esper/core/service/EPServiceProviderImpl.java的构造器中也调用了初始化
/**
* Constructor - initializes services.
*
* @param configuration is the engine configuration
* @param engineURI is the engine URI or "default" (or null which it assumes as "default") if this is the default provider
* @param runtimes map of URI and runtime
* @throws ConfigurationException is thrown to indicate a configuraton error
*/
public EPServiceProviderImpl(Configuration configuration, String engineURI, Map<String, EPServiceProviderSPI> runtimes) throws ConfigurationException {
if (configuration == null) {
throw new NullPointerException("Unexpected null value received for configuration");
}
if (engineURI == null) {
throw new NullPointerException("Engine URI should not be null at this stage");
}
this.runtimes = runtimes;
this.engineURI = engineURI;
verifyConfiguration(configuration);
serviceListeners = new CopyOnWriteArraySet<EPServiceStateListener>();
configSnapshot = takeSnapshot(configuration);
statementListeners = new CopyOnWriteArraySet<EPStatementStateListener>();
doInitialize(null);
}
这里的调用了两个初始化,一个是EPServiceProviderImpl构造器的初始化doInitialize(null);一个是runtime.postInitialize();正好跟epServiceProvider.initialize();的两个初始化方法对应
public void initialize() {
initializeInternal(null);
}
private void initializeInternal(Long currentTime) {
doInitialize(currentTime);
postInitialize();
}
监听事件
EPStatement statement = epAdministrator.createEPL(epl);
statement.addListener( (newData, oldData) -> {
String name = (String) newData[0].get("name");
int age = (int) newData[0].get("age");
System.out.println(String.format("Name: %s, Age: %d", name, age));
});
发送事件
epServiceProvider.getEPRuntime().sendEvent(new PersonEvent("Peter", 10));