java代码:
Table table = tenv.sqlQuery("select row_time,host_name from table where host_name = 'sujun-centos'")
DataStream<Row> datastream = table.toAppendStram\[Row]()
执行过程:
processOperator
实际就是select操作, 这一步用到了SqlFunctions.toLong方法将timestamp字段转为long, 时间加8小时periodicWatermarksOperator
将上一步的结果作为watermark传下去(+8小时后的结果)where
实际就是where操作
4.outputRowTimeOperator(toAppendStram操作触发的)
将CROW 转为Row , 这一步会之前+8小时的rowtime字段 -8h
打印结果
这里获取的row结果和读进来的一致, 但是下游收到的watermark是+8小时的如果后续对datastream进行时间窗口操作,应该会发生数据在+8小时的窗口里
根本上解决此类问题应该是flink允许设置时区, 目前版本应该还不支持.