输入
{
"a": 1,
"b": [
{
"name": "name1",
"age": 11
},
{
"name": "name2",
"age": 12
},
{
"name": "name3",
"age": 13
}
]
}
期望输出
1 name1 11
1 name2 12
1 name3 13
知识点
--Unnesting WITH ORDINALITY is not supported yet.
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
demo
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xp</groupId>
<artifactId>test-flink</artifactId>
<version>1.9.1</version>
<properties>
<scala.binary.version>2.11</scala.binary.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
public class TestUnnested {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
List<Row> rows = Arrays.asList(
Row.of(1, new Row[]{Row.of(12, "sd"), Row.of(15, "sd")}),
Row.of(2, new Row[]{Row.of(13, "sd"), Row.of(16, "sd")}),
Row.of(3, new Row[]{Row.of(14, "sd"), Row.of(17, "sd")})
);
TypeInformation<?>[] types = new TypeInformation[]{Types.INT, Types.OBJECT_ARRAY(Types.ROW(Types.INT,Types.STRING))};
// TypeInformation<?>[] types = new TypeInformation[]{Types.INT, ObjectArrayTypeInfo.getInfoFor(new RowTypeInfo(Types.INT, Types.STRING))};
String[] typeNames = new String[]{"a", "b"};
DataStream<Row> source = environment
.fromCollection(rows)
.returns(new RowTypeInfo(types, typeNames));
tableEnvironment.registerDataStream("source", source);
Table a = tableEnvironment.sqlQuery("select a,t.c,t.d from source,unnest(b) as t (c,d)");
tableEnvironment.toAppendStream(a, Row.class).print();
environment.execute();
}
}