背景
目前flink sql是不支持source/sink并行度配置的,flink sql中各算子并行度默认是根据source的partition数或文件数来决定的,比如常用的kafka source topic的partition是100,那么fink sql任务的并发就是100。但有时任务相对简单,比如datax任务,没有逻辑根本不需要很大的并发,100并发显然会造成资源的严重浪费。那么就有必要扩展connector使其支持并发度配置。
如何做
一、首先,需要在ddl的with参数中支持并发的配置,比如定义’parallelism‘ = ’10‘。
二、需要各connector支持这个参数的解析,这里分1.11版本之前还是之后。
1.11前
1.11版本,对Table接口进行了重构,在这之前实现一个connector需要做哪些,请见Flink实战之自定义flink sql connector。这里就以kafka为例来说明。
当ddl增加了parallelism配置之后,如何让connector识别呢?
- 需要在KafkaTableSourceSinkFactoryBase#supportedProperties增加一行properties.add("parallelism");
- parallelism的配置所在的properties会传递到KafkaTableSourceBase或KafkaTableSinkBase
- 这是就看你是需要扩展source还是sink或者两者都要了,source的话在KafkaTableSourceBase#getDataStream方法中取出parallelism参数,设置到datastream中env.addSource(kafkaConsumer).name(explainSource()).setParallelism(parallelism);
sink的话在KafkaTableSinkBase#consumeDataStream方法中将parallelism替换到datastream的并行度中。
这样kafka的source和sink的并发就设置完成了,其他connector也是类似。
1.11后
1.11后Factory只支持返回RunTimeProvider了,框架直接利用source或sink function生成Transformation,所以在各自的connector已经不好再设置并发了,但是依然有办法--直接修改框架代码,注入parallelism配置。具体怎么做如下:
如果是source在CommonPhysicalTableSourceScan#createSourceTransformation中从tableSourceTable获取catalogTable,进而可以获取with properties配置,然后再给transformation设置并发
具体代码如下:
val parallelism = tableSourceTable.catalogTable.toProperties.get("parallelism")
if (parallelism != null)
transformation.setParallelism(parallelism.toInt)
如果是sink在CommonPhysicalSink#createSinkTransformation中类似source 的方法设置并发。代码如下:
val parallelism = if (catalogTable.toProperties.get("parallelism") != null) catalogTable.toProperties.get("parallelism").toInt else inputTransformation.getParallelism
new SinkTransformation(
inputTransformation,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
if (parallelism <= 0) inputTransformation.getParallelism else parallelism).asInstanceOf[Transformation[Any]]
这种配置都是可选的,如果配了就用你的配置,没有配的话则使用上游transformation配置。
经过上面的处理所有connector都具有了设置并发的功能,各connector只需要支持properties的校验即可。
2020-11-08 更新
FLIP-146已经支持sink并发支持,提供ParallelismProvider接口,SinkFunctionProvider和OutputFormatProvider已经实现了该接口,所以各connector只需要支持sink.parallelism属性的支持,并将并行度提供给Provider即可,参考FLINK-19937