如下就是代码:
object FlinkSQLPOJO {
def main(args: Array[String]):Unit = {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val streamEnvironment: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
streamEnvironment.setParallelism(1)
val stream: DataStream[Sensor] = streamEnvironment
.readTextFile("/Users/run/Downloads/workspaceIDEA/flink-tutorials/data/sensor.txt")
.map(line => {
val arr: Array[String] = line.split(",")
new Sensor(arr(0), arr(1).toLong, arr(2).toDouble)
})
val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamEnvironment)
val sensorTable: Table = tableEnvironment.fromDataStream(stream)
tableEnvironment.createTemporaryView("t_sensor",sensorTable)
tableEnvironment.sqlQuery(
"""
|select * from t_sensor where temperature <> 16.0
""".stripMargin).toAppendStream[Sensor].print()
streamEnvironment.execute(this.getClass.getName)
如图: