概述
跟传统的关系型数据库类似,分布式环境中的join在提供优化器“hint”(提示)以告诉优化器选择一些执行策略。目前一些优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数:
join:默认为等值连接(Equi-join),即我们平时看到的inner join;
outerjoin:外连接,具体细分为left-outer join、righ-outer join、full-outer join;
cross:交叉连接,求两个数据集的笛卡尔积。
1.算法分析
常用来实现连接的算法有:hash join、sort-merge join以及nested loop join,下面我们对这三种算法进行简单介绍。首先hash算法实现连接时,通常分为两个阶段:
build:为参与连接的两个数据集中较小的数据集准备好哈希表,哈希表中的记录包含着连接的属性以及它对应的行。因为哈希表是通过对连接属性应用一个哈希函数来访问的,因此通过它将比扫描初始数据集更快地发现给定的连接属性对应的行;
probe:一旦哈希表构建完成,会扫描更大的数据集并通过从更小的数据集匹配哈希表以发现相关的行。
而使用sort-merge算法实现连接时,通常也划分为两个阶段:
sort:对两个数据集在他们的连接属性上进行排序;
merge:合并排过序的数据集。
nested loop实现连接相对更容易理解,它使用两层嵌套循环分别作用于两个参与连接的数据集。
2.连接策略
通过上面的介绍,我们得知当选择hash算法来实现连接时,需要确定以哪个输入端作为build端,哪个输入端作为probe端,这是影响其执行效率的因素之一(因为通常选择数据量较小的数据集作为build端)。而当以sort-merge算法来实现连接时,不会区分输入端的特殊职责,也就不存在build阶段和probe阶段。
为了理清算法跟参与连接的输入端的关系,Flink将它们区分成两种不同策略的:本地策略以及传输(ship)策略。其中传输策略表示如何移动两个输入端中的数据使得它们具备连接的条件;本地策略则指两个已在本地的输入端数据集所执行的连接算法。
我们来解释一下这两种策略,假设有两个待连接的数据集(R和S)。传输策略有如下两种:
Broadcast-Forward strategy (BF):该策略会将一个完整的数据集,比如R,广播到数据集S的每一个分区上,而数据集S的所有数据则一直处于本地,无需网络传输;
Repartition-Repartition strategy (RR):以相同的分区函数以及用于连接的键属性分区两个数据集R、S;
正如上面已经提及的,本地策略也即连接的实现算法也有两种:
Sort-Merge-Join strategy (SM):首先对两个输入端的数据集在它们的连接键属性上进行排序(排序阶段),然后合并排过序的数据集(合并阶段);
Hybrid-Hash-Join strategy (HH):分为构建阶段和探索阶段;
在不指定“Hint”的情况下,Flink在进行批处理优化时会根据成本自动选择传输策略以及本地策略。优化器的一个关键特征是它会根据已经存在的数据属性来进行推理。就连接运算而言,如果某一个输入端的数据量远小于另一输入端,Flink会倾向于选择BF传输策略,将较小的输入端广播给较大的输入端的每一个分区,并在本地策略中选择HH且以较小的输入端作为HH的构建端;如果优化器得知某个(或两个)输入端已排好序,那么生成的候选计划将不再重分区该输入端,此时它更倾向于选择RR传输策略以及SM本地策略。
除了优化器的自动选择,当用户对数据集非常了解的情况下,Flink定义了JoinHint允许用户为join(inner join)指定连接策略给予优化器提示。JoinHint提供了人为选择连接策略的灵活性,其使用方式有两种,一种是直接指定两个输入端的大小:
另一种是直接指定连接策略: