如图:
object Stream2Table4Bean {
def main(args: Array[String]):Unit = {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val streamEnvironment: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
streamEnvironment.setParallelism(1)
val stream: DataStream[Sensor] = streamEnvironment
.readTextFile("/Users/run/Downloads/workspaceIDEA/flink-tutorials/data/sensor.txt")
.map(line => {
val arr: Array[String] = line.split(",")
new Sensor(arr(0), arr(1).toLong, arr(2).toDouble)
})
val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnvironment)
val tableRes: Table = tableEnvironment.fromDataStream(stream)
.select("id,timestamp,temperature")
.filter("temperature>17.0")
val result: DataStream[Sensor] = tableRes.toAppendStream[Sensor]
result.print()
streamEnvironment.execute(this.getClass.getName)