Table api案例:来自官网https://flink.sojb.cn/dev/table/common.html#structure-of-table-api-and-sql-programs
Table对应两种api ,table Api query和table Sql query.
Flink内部提供的隐式函数。
implicit def table2RowDataStream(table: Table): DataStream[Row] = {
val tableEnv =table.tableEnv.asInstanceOf[ScalaStreamTableEnv] tableEnv.toAppendStream[Row](table)
}
将DataStream注册为表:
将DataStream转换为表
将dataStream转换为Table.调用函数fromDataStream.
创建一个唯一的表名字,并参数replace确定是否替换已经存在的表。registerDataStreamInternal方法根据DataStream创建一个DataStreamTable。
将DataStreamTable注册到CataLog中,利用CatalogManager实现。
利用datastreamtable创建一个FlinkTempTable,其实就是一个CataLogTable.Catalog内部有tableschema,column.最后调用createTable将catalogtable注入内存中。这个table是一个Map 数据结构。
最后调用scan方法将之前注册的变返回,利用的catalogManager
将表转换为DataStream
调用方法toAppendStream方法.创建返回的Typeinformation,将javastream转换为scala stream,返回
将DataSet转换为表,注册表通过FlinkInMemoryCatalog注册实现的。转换表和DataStream转换为表逻辑一样,先注册表然后从CataManager中取出这个表
将表转换为DataSet
toDataSet方法。