Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
简介
Flink一个常用的场景是维表关联。一个流式的事实表和一个全量(可认为是批)的维表关联。对于条事实表数据,如果每次join的时候,都去全量扫描维表。效率会非常底下。幸好Flink为我们提供了LookupTableSource
。可以根据join的字段值来查询维表。避免了全表扫描。甚至还为我们提供了缓存功能。对于相同的join字段值,无需反复查询维表。进一步提高了运行效率。
Flink自带的数据源中JDBC,Hive和HBase实现了上述的LookupTableSource
。
LookupTableSource
可以在运行时从外部存储中按照指定的key查找数据。
和ScanTableSource
不同的是,LookupTableSource
无需扫描整个表,它可以按需从外部存储的表中提取出数据。(有关ScanTableSource
的内容,参见Flink 源码之 SQL TableSource 和 TableSink)
它具有一个方法getLookupRuntimeProvider
,定义如下:
@PublicEvolving
public interface LookupTableSource extends DynamicTableSource {
LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}
该方法要求返回一个provider,这个provider提供根据条件查询外部数据源中对应数据的逻辑。
LookupTableSource
拥有4个实现类:
- JdbcDynamicTableSource
- HiveDynamicTableSource
- 两个HBaseDynamicTableSource: 分别对应HBase 1.x和2.x版本。
接下来我们以外部JDBC数据源为例。分析LookupTableSource
查找数据的逻辑。
JdbcDynamicTableSource
JdbcDynamicTableSource
是JDBC外部数据源专用的TableSource
。它同时支持scan方式和lookup方式读取。我们分析它实现LookupTableSource
接口的getLookupRuntimeProvider
方法。
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC only support non-nested look up keys
// context.getKeys()获取的是一个int[][]二维数组
// 例如:
// ROW < i INT, s STRING, r ROW < i2 INT, s2 STRING > >
// 如果key为i和s2的话
// context.getKeys()返回的是[[0], [2, 1]]
// 第一个[0]表示i字段的索引,从0开始
// 第二个[2, 1]中2代表s2位于最外层ROW的索引为2的元素,也是一个ROW。1代表在内嵌ROW中,s2的索引为1
// 所以说对于无内嵌结构的数据,内层数组的长度一定是1
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
// 不支持内嵌数据结构的查找,这里需要检查
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
// 获取key字段的名字,存入数组
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
// 获取数据ROW每个字段的类型
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
// 构造出JdbcRowDataLookupFunction
JdbcRowDataLookupFunction lookupFunction =
new JdbcRowDataLookupFunction(
options,
lookupMaxRetryTimes,
// 所有字段名
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
// 所有字段类型,index和字段名一一对应
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
rowType);
if (cache != null) {
// 如果配置缓存,使用PartialCachingLookupProvider包装lookupFunction
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}
在开始分析lookupFunction
之前。我们先分析下带有cache的LookupFunction
的工作逻辑。
从上面看不出来Flink对PartialCachingLookupProvider
包装的lookupFunction
做了什么。PartialCachingLookupProvider
只是一个provider,顾名思义是一个提供方。具体怎么用还得看调用方。我们追踪到调用getLookupRuntimeProvider
的地方。它位于LookupJoinUtil
的findLookupFunctionFromNewSource
方法。该方法代码很长,我们关心的片段如下:
if (provider instanceof LookupFunctionProvider) {
if (provider instanceof PartialCachingLookupProvider) {
PartialCachingLookupProvider partialCachingLookupProvider =
(PartialCachingLookupProvider) provider;
syncLookupFunction =
new CachingLookupFunction(
partialCachingLookupProvider.getCache(),
wrapSyncRetryDelegator(partialCachingLookupProvider, joinHintSpec));
} else if (provider instanceof FullCachingLookupProvider) {
// ...
} else {
syncLookupFunction =
wrapSyncRetryDelegator((LookupFunctionProvider) provider, joinHintSpec);
}
}
从上面代码片段中我们发现如果Provider是PartialCachingLookupProvider
类型,将其封装到CachingLookupFunction
中。
我们查看它的检索数据lookup
方法:
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
// cache中存储了关键字数据(keyRow)和根据关键字数据查找出的数据的对应关系
Collection<RowData> cachedValues = cache.getIfPresent(keyRow);
// 如果命中缓存,返回缓存数据
if (cachedValues != null) {
// Cache hit
return cachedValues;
} else {
// Cache miss
// 如果没有找到,使用包装的lookupFunction查找数据
Collection<RowData> lookupValues = lookupByDelegate(keyRow);
// Here we use keyRow as the cache key directly, as keyRow always contains the copy of
// key fields from left table, no matter if object reuse is enabled.
// 如果没有查找到数据,缓存空集合
if (lookupValues == null || lookupValues.isEmpty()) {
cache.put(keyRow, Collections.emptyList());
} else {
// 否则,缓存查询到的数据
cache.put(keyRow, lookupValues);
}
// 返回查询到的数据
return lookupValues;
}
}
上面的cache底层实现是Guava cache,可以配置cache数据写入/读取之后多长时间过期。防止外部数据源关联数据变化之后永远没有机会感知到。
JdbcRowDataLookupFunction
JdbcRowDataLookupFunction
这个方法用于查找通过JDBC连接的外部数据源中相关数据记录的。它实现了LookupFunction
抽象类。这个接口中有一个抽象方法lookup
,接受一个携带检索关键字的数据行keyRow
(keyRow
的内容为join从句关键字段的内容),返回外部数据源中有关联的数据。LookupFunction
通过LookupJoinCodeGenerator
代码生成器,最终生成Flink的ProcessFunction
。LookupFunction
共有4个实现类:
- JdbcRowDataLookupFunction: 专用于查询JDBC外部数据源
- HBaseRowDataLookupFunction: 专用于查询HBase外部数据源
- CachingLookupFunction: 也是一个包装类。加入了缓存功能。如果缓存命中,直接返回缓存中结果。如果没有命中,查询后将结果放入缓存。上面已经分析过。
- RetryableLookupFunctionDelegator: 包装类。通过外部的
ResultRetryStrategy
配置重试策略。
接下来开始分析JdbcRowDataLookupFunction
。它的构造函数如下:
public JdbcRowDataLookupFunction(
JdbcConnectorOptions options,
int maxRetryTimes,
String[] fieldNames,
DataType[] fieldTypes,
String[] keyNames,
RowType rowType) {
checkNotNull(options, "No JdbcOptions supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
// 创建JDBC连接Provider,用来获取JDBC连接
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
// 检查keyName的内容必须在nameList中
// 返回keyName对应的字段名
DataType[] keyTypes =
Arrays.stream(keyNames)
.map(
s -> {
checkArgument(
nameList.contains(s),
"keyName %s can't find in fieldNames %s.",
s,
nameList);
return fieldTypes[nameList.indexOf(s)];
})
.toArray(DataType[]::new);
// 最大重试次数
this.maxRetryTimes = maxRetryTimes;
// 根据不同数据库的dialect,创建按照key查找对应数据的SQL语句
// JdbcDialect共支持Derby, MySQL, Oracle和Postgres四种数据库
this.query =
options.getDialect()
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
String dbURL = options.getDbURL();
// 根据数据库连接URL来判断是哪个数据库dialect
JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL);
// 获取当前dialect对应的数据库数据类型和Flink内部数据类型转换器
this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
// 获取key字段对应的类型转换器
this.lookupKeyRowConverter =
jdbcDialect.getRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
}
上面这段代码的重点是getSelectFromStatement
。
getSelectFromStatement
方法创建按照key字段查询数据的select语句。代码如下所示:
@Override
public String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions =
// 获取所有字段名
Arrays.stream(selectFields)
// 将他们按照对应数据库dialect的要求,加引号引起来
.map(this::quoteIdentifier)
// 逗号分隔
.collect(Collectors.joining(", "));
// 将conditionFields构建为类似`key1` = :key1 AND `key2` = :key2 (以MySQL dialect为例)
// 冒号+命名是JDBC PreparedStatement的命名参数,例如上面的:key1,:key2
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
// 返回拼装的select语句
return "SELECT "
+ selectExpressions
+ " FROM "
+ quoteIdentifier(tableName)
+ (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
JdbcRowDataLookupFunction
初始化的时候会执行open
方法。该方法开启数据库连接,同时将上面创建出的查询SQL字符串转化为PreparedStatement。代码如下:
@Override
public void open(FunctionContext context) throws Exception {
try {
// 创建数据库连接,构建PreparedStatement
establishConnectionAndStatement();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
一切准备工作完成之后,lookup
方法的逻辑看起来就非常明了了。它根据keyRow查找对应的数据。代码如下:
@Override
public Collection<RowData> lookup(RowData keyRow) {
// 最多重试maxRetryTimes次
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// 清除绑定的参数
statement.clearParameters();
// 将keyRow中的数据set到statement中
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
// 执行查询
try (ResultSet resultSet = statement.executeQuery()) {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
// 将数据逐条转换为Flink内部的类型
RowData row = jdbcRowConverter.toInternal(resultSet);
rows.add(row);
}
// 裁剪list大小到真实包含数据的条数
rows.trimToSize();
return rows;
}
} catch (SQLException e) {
LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
// 如果超出重试次数,报错返回
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of JDBC statement failed.", e);
}
try {
// 如果连接已关闭,创建新的连接
if (!connectionProvider.isConnectionValid()) {
statement.close();
connectionProvider.closeConnection();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException exception) {
LOG.error(
"JDBC connection is not valid, and reestablish connection failed",
exception);
throw new RuntimeException("Reestablish JDBC connection failed", exception);
}
try {
// 每次重试间隔时间递增
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。