简介
只要实现 SourceFunction 接口对应的方法就可以自定义数据源
1.创建环境
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> streamSource = env.addSource(new MySensorSource());
streamSource.print();
env.execute();
}
2.实现 SourceFunction 接口
public static class MySensorSource implements SourceFunction<SensorReading> {
//定义一个标识位用来控制数据
private boolean running = true;
//定义一个随机数发生器
Random random = new Random();
public void run(SourceContext<SensorReading> ctx) throws Exception {
//设置10个传感器的初识温度
HashMap<String, Double> sensorTempMap = new HashMap<String, Double>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for(String sensorId : sensorTempMap.keySet()){
//在当前的温度基础上随机波动
Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newtemp);
ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newtemp));
}
//控制输出频率
Thread.sleep(1000L);
}
}
public void cancel() {
running = false;
}
}