实时计算支持三种自定义函数(UDX),分别是:
UDF(User Defined Function)自定义标量函数,输入一条记录的0个、1个或者多个值,返回一个值。
UDAF(User Defined Aggregation Function)自定义聚合函数,将多条记录聚合成一条值。
UDTF(User Defined Table Function)自定义表值函数,能将多条记录转换后再输出,输出记录的个数和输入记录数不需要一一对应,也是唯一能返回多个字段的自定义函数。
本文档通过使用UDTF解析字节数组成多个字段
如存储的是{"name":"Alice", "age":13, "grade":"A"}的字节数组,通过UDTF 变成三列name,age,grade 值分别为 Alice,13,A
1 UDTF
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.calcite.com.google.common.collect.Lists;
import org.apache.flink.table.api.functions.TableFunction;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.TypeInfoWrappedDataType;
import org.apache.flink.types.Row;
import java.nio.charset.Charset;
import java.util.List;
public class kafkaUDTF extends TableFunction<Row> {
public kafkaUDTF() {
}
private List<Class> clazzes = Lists.newArrayList();
private List<String> fieldName = Lists.newArrayList();
public kafkaUDTF(String... args) {
for (String arg : args) {
if (arg.contains(",")) {
//将 "VARCHAR" 转换为 String.class, "INTEGER" 转为 Integer.class等
clazzes.add(ClassUtil.stringConvertClass(arg.split(",")[1]));
fieldName.add(arg.split(",")[0]);
}
}
}
public static void main(String[] args) {
kafkaUDTF kafkaUDTF = new kafkaUDTF("name,VARCHAR", "age,INTEGER", "grade,VARCHAR");
kafkaUDTF.eval("{\"name\":\"Alice\", \"age\":13, \"grade\":\"A\"}".getBytes());
}
public void eval(byte[] message) {
String mess = new String(message, Charset.forName("UTF-8"));
JSONObject json = JSON.parseObject(mess);
Row row = new Row(fieldName.size());
for (int i = 0; i < fieldName.size(); i++) {
row.setField(i, json.get(fieldName.get(i)));
}
collect(row);
}
@Override
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
TypeInformation[] typeInformations = new TypeInformation[clazzes.size()];
for (int i = 0; i < clazzes.size(); i++) {
typeInformations[i] = BasicTypeInfo.of(clazzes.get(i));
}
RowTypeInfo rowType = new RowTypeInfo(typeInformations);
return new TypeInfoWrappedDataType(rowType);
}
}
2. Main
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStreamSource<byte[]> byteSource = env.fromElements("{\"name\":\"Alice\", \"age\":13, \"grade\":\"A\"}".getBytes());
Table byteSourceTable = tableEnv.fromDataStream(byteSource, "message");
tableEnv.registerTable("b", byteSourceTable);
tableEnv.registerFunction("kafkaUDTF", new kafkaUDTF("name,VARCHAR", "age,INTEGER", "grade,VARCHAR"));
Table res1 = tableEnv.sqlQuery("select T.name, T.age, T.grade\n" +
"from b as S\n" +
"LEFT JOIN LATERAL TABLE(kafkaUDTF(message)) as T(name, age, grade) ON TRUE");
res1.writeToSink(new PrintTableSink(TimeZone.getDefault()));
tableEnv.execute();
//打印结果为 task-1> (+)Alice,13,A
3. 依赖
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.9</version>
</dependency>
4.扩展性
由于blink 的kafka source只支持字节数组,可通过这个UDTF从字节数组解析出想要的字段。