背景:
今天在线上测试时消费了下堆积的数据,结果发现siddhi的窗口数据好像不太对。于是测试下。使用了window.externalTimeBatch() 函数。siddhi没有获取窗口开始时间和结束时间的功能,于是想通过min(time),max(time)获取,但是获取后发现这两个时间差居然远远大于窗口。于是在本地测试。
demo:
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
public static void main(String[] args) throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp =
"define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
"from cseEventStream#window.externalTimeBatch(time,120 sec) select symbol,count(id) as count,min(time) as sTime,max(time) as eTime group by symbol insert into outputStream;";
;
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println(event);
}
}
});
inputHandler.send(new Object[]{"1", "Welcome1", 500f, 300, 1575011611080L}); //11-29 15:13:31
inputHandler.send(new Object[]{"3", "Welcome1", 500f, 300, 1575011071000L}); //2019-11-29 15:04:31
inputHandler.send(new Object[]{"4", "Welcome1", 500f, 800, 1575011791000L}); //2019-11-29 15:16:31
siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}
}
结果:
这个结果居然统计了2个,而且时间差也不止2分钟。看来是中间乱序导致的。
看了下官方文档:
看来siddhi不支持乱序模式。这个窗口应该是只判断是不是小于结束时间,不判断是不是大于窗口开始时间。