Regular Join
Regular joins are the most generic type of join in which any new records or changes to either side of the join input are visible and are affecting the whole join result. For example, if there is a new record on the left side, it will be joined with all of the previous and future records on the right side.
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
NonWindowJoin
- Stream-Sream Join的BaseClass(NonWindow);
- leftState/rightState: <Row, <Long, Long>>类型的状态结构,其中Value保存的是这个Row对应的行数及expiredTime;
NonWindowInnerJoin
- 继承NonWindowJoin;
- Stream-Stream中的InnerJoin的主要实现方式;
- 实现的基本逻辑,具体逻辑可以参考code:
- 将输入Input添加state,或者从state中Retract输入;
- 向另外的状态中查询进行关联处理;
- 如果设定了保留时间,当保留时间过期时会触发状态数据的回收;
- 其中
processElement
函数为left流和right流的共用函数,当是左流是isLeft为true,为右流是isLeft是false; - 这个函数中有几个疑点:
- 针对InnerJoin是否有必要每个Input都要遍历所有的对侧状态?
- 清理对侧状态数据判断是否过期是基于ProcessTime?
override def processElement(
value: CRow,
ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
out: Collector[CRow],
timerState: ValueState[Long],
currentSideState: MapState[Row, JTuple2[Long, Long]],
otherSideState: MapState[Row, JTuple2[Long, Long]],
isLeft: Boolean): Unit = {
val inputRow = value.row
// Step1: 更新状态:
// - 写入Input到State或者从State中进行回撤;
// - 设定该row的expiredTime;
updateCurrentSide(value, ctx, timerState, currentSideState)
cRowWrapper.setCollector(out)
cRowWrapper.setChange(value.change)
// Step2: 遍历对面State状态:
// 疑点: 针对InnerJoin是否有必要每个Input都要遍历所有的对侧状态,这样当状态值很大时性能会骤减?
val otherSideIterator = otherSideState.iterator()
// join other side data
while (otherSideIterator.hasNext) {
val otherSideEntry = otherSideIterator.next()
val otherSideRow = otherSideEntry.getKey
val otherSideCntAndExpiredTime = otherSideEntry.getValue
cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
// Step3: 调用JoinFunction执行真实的Join任务
callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
// clear expired data. Note: clear after join to keep closer to the original semantics
// Step4: 清理对侧状态数据
if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
otherSideIterator.remove()
}
}
}
Time-windowed Joins
A time-windowed join is defined by a join predicate, that checks if the time attributes of the input records are within certain time constraints, i.e., a time window.
- 基于时间窗口的Join: join条件中带有TimeWindow,如下:
SELECT *
FROM
Orders o,
Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
DataStreamWindowJoin
- Windowed Join也支持Append输出数据,不支持Update输出;
- Update Mode: 在Flink中,如果
DataStreamRel
算子支持producesUpdates
和producesRetractions
,则每次输入都会产生数据(可能不是最终完整的结果),默认false;- DataStreamGroupAggregate: 没有window的Group时,producesUpdates=true,对于Agg即使不是最终结果也需要把结果输出;
- DataStreamJoin: 当不是InnerJoin时支持
producesRetractions
;
- Append Mode: 在Flink中,除了上述算子之外的算子则认为是
Append Mode
。
- Update Mode: 在Flink中,如果
WindowBound的计算
- 计算join时需要先确认WindowsBound的范围;
- WindowBound是基于左表的左沿(为负值)、右沿取值范围;
- Join的窗口长度即为右沿 - 左沿所得的值;
其推算逻辑如下:
--sql1: [-3600000, 3600000]
SELECT t1.a, t2.b
FROM MyTable as t1
join MyTable2 as t2
on t1.a = t2.a
and t1.proctime between t2.proctime - interval '1' hour and t2.proctime + interval '1' hour;
--sql2: [-999, 999]
t1.proctime > t2.proctime - interval '1' second and t1.proctime < t2.proctime + interval '1' second
--sql3: [-1999, 1999]
t2.c > t1.c - interval '2' second and t2.c < t1.c + interval '2' second
RowTimeBoundedStreamJoin(Join的实现逻辑)
下面基于处理左流的逻辑分析执行Join的实现。
涉及到的状态管理
- leftCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]], 左表的状态结构:
- Key值:左表的eventTime;
- Value值: [Row行,该行是否emit过];
- rightCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]];
- leftTimerState: ValueStateDescriptor[Long], 左表的Timer状态;
- rightTimerState: ValueStateDescriptor[Long];
涉及到的几个时间变量
- leftOperatorTime/rightOperatorTime
- 左表表的操作时间,两个值相同;
- 对于ProcTimeBoundedStreamJoin: 即为当前的处理时间;
- 对于RowTimeBoundedStreamJoin: 即为当前的watermark时间;
- rightQualifiedLowerBound/rightQualifiedUpperBound: 通过左表的WindowBound计算右表的左沿值(可以理解为对称的结构)
- rightExpirationTime:
- 计算公式:leftOperatorTime - rightRelativeSize(BoundWindow的右值) - allowedLateness - 1
- rightTime: 从右表的状态中获取每行的row的真实时间戳;
步骤梳理
- 判断是否需要同右表状态数据做Join:
- 符合Join右表的状态条件, 遍历右表的状态数据:
- 执行Join算子;
- 判断右值数据是否过期,如果过期,如果是rightJoin/outerJoin进行emit null,同时将该值删除掉;
- 符合Join右表的状态条件, 遍历右表的状态数据:
- 判断是否将输入数据写入到状态数据;
- 如果不写入状态数据,如果是leftJoin/outerJoin则emit null;
/**
* Process rows from the left stream.
*/
override def processElement1(
cRowValue: CRow,
ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
joinCollector.innerCollector = out
// 更新OperatorTime,即leftOperatorTime/rightOperatorTime的值;
updateOperatorTime(ctx)
val leftRow = cRowValue.row
val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
// 通过左表的WindowBound计算右表的左沿值(可以理解为对称的结构)
val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
// 通过左表的WindowBound计算右表的右沿值
val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
var emitted: Boolean = false
// Check if we need to join the current row against cached rows of the right input.
// The condition here should be rightMinimumTime < rightQualifiedUpperBound.
// We use rightExpirationTime as an approximation of the rightMinimumTime here,
// since rightExpirationTime <= rightMinimumTime is always true.
// 通过上次计算的右表ExpriedTime来评估是否需要跟左表做Join:
// - 从反面看:因为rightExpirationTime是上次计算的值,真实值一定比该值大,如果该值都不小于rightQualifiedUpperBound,说明右流很快了,左右流的Join的窗口范围早已经过了,就不需要Join了;
if (rightExpirationTime < rightQualifiedUpperBound) {
// Upper bound of current join window has not passed the cache expiration time yet.
// There might be qualifying rows in the cache that the current row needs to be joined with.
// leftOperatorTime: 左表的操作时间;
// - 如果是ProcTimeBoundedStreamJoin处理:该时间即为processiong time,当前处理的时间;
// - 如果是RowTimeBoundedStreamJoin处理:该时间即为event time,返回的是当前的watermark(?);
// calExpirationTime函数的逻辑即为:leftOperatorTime
rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
// Join the leftRow with rows from the right cache.
// LeftRow同右表状态做Join;
val rightIterator = rightCache.iterator()
while (rightIterator.hasNext) {
val rightEntry = rightIterator.next
val rightTime = rightEntry.getKey
// 确认右表状态的范围在预期的WindowFbound范围之内
if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
val rightRows = rightEntry.getValue
var i = 0
var entryUpdated = false
while (i < rightRows.size) {
joinCollector.reset()
val tuple = rightRows.get(i)
// 执行Join的逻辑
joinFunction.join(leftRow, tuple.f0, joinCollector)
// 标记该行是否emitted过,该flag可以用作在最后确认是否在left join/outer join时最终是否emit null值使用:
// 如果已经emitted过,则最终不会emit null;
emitted ||= joinCollector.emitted
if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
if (!tuple.f1 && joinCollector.emitted) {
// Mark the right row as being successfully joined and emitted.
tuple.f1 = true
entryUpdated = true
}
}
i += 1
}
if (entryUpdated) {
// Write back the edited entry (mark emitted) for the right cache.
rightEntry.setValue(rightRows)
}
}
// 处理有表状态key已经过期的情况;
if (rightTime <= rightExpirationTime) {
if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
val rightRows = rightEntry.getValue
var i = 0
while (i < rightRows.size) {
val tuple = rightRows.get(i)
if (!tuple.f1) {
// Emit a null padding result if the right row has never been successfully joined.
joinCollector.collect(paddingUtil.padRight(tuple.f0))
}
i += 1
}
}
// eager remove
rightIterator.remove()
} // We could do the short-cutting optimization here once we get a state with ordered keys.
}
}
// Check if we need to cache the current row.
if (rightOperatorTime < rightQualifiedUpperBound) {
// Operator time of right stream has not exceeded the upper window bound of the current
// row. Put it into the left cache, since later coming records from the right stream are
// expected to be joined with it.
var leftRowList = leftCache.get(timeForLeftRow)
if (null == leftRowList) {
leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
}
leftRowList.add(JTuple2.of(leftRow, emitted))
leftCache.put(timeForLeftRow, leftRowList)
if (rightTimerState.value == 0) {
// Register a timer on the RIGHT stream to remove rows.
registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
}
} else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
if (!emitted) {
// Emit a null padding result if the left row is not cached and successfully joined.
joinCollector.collect(paddingUtil.padLeft(leftRow))
}
}
疑惑
- 针对生产级别,窗口较大时(状态数据的积累会很大),每条流数据符合条件时,都会遍历对侧所有的状态数据,这个性能是不是会很低?
参考: