Flink Table中双流Join的实现

Regular Join

Regular joins are the most generic type of join in which any new records or changes to either side of the join input are visible and are affecting the whole join result. For example, if there is a new record on the left side, it will be joined with all of the previous and future records on the right side.

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

NonWindowJoin

  • Stream-Sream Join的BaseClass(NonWindow);
  • leftState/rightState: <Row, <Long, Long>>类型的状态结构,其中Value保存的是这个Row对应的行数及expiredTime;
NonWindowInnerJoin
  • 继承NonWindowJoin;
  • Stream-Stream中的InnerJoin的主要实现方式;
  • 实现的基本逻辑,具体逻辑可以参考code:
    • 将输入Input添加state,或者从state中Retract输入;
    • 向另外的状态中查询进行关联处理;
    • 如果设定了保留时间,当保留时间过期时会触发状态数据的回收;
  • 其中processElement函数为left流和right流的共用函数,当是左流是isLeft为true,为右流是isLeft是false;
  • 这个函数中有几个疑点:
  • 针对InnerJoin是否有必要每个Input都要遍历所有的对侧状态?
  • 清理对侧状态数据判断是否过期是基于ProcessTime?
  override def processElement(
      value: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow],
      timerState: ValueState[Long],
      currentSideState: MapState[Row, JTuple2[Long, Long]],
      otherSideState: MapState[Row, JTuple2[Long, Long]],
      isLeft: Boolean): Unit = {

    val inputRow = value.row
    // Step1: 更新状态:
    // - 写入Input到State或者从State中进行回撤;
   // -  设定该row的expiredTime;
    updateCurrentSide(value, ctx, timerState, currentSideState)

    cRowWrapper.setCollector(out)
    cRowWrapper.setChange(value.change)

   // Step2: 遍历对面State状态:
  // 疑点: 针对InnerJoin是否有必要每个Input都要遍历所有的对侧状态,这样当状态值很大时性能会骤减?
    val otherSideIterator = otherSideState.iterator()
    // join other side data
    while (otherSideIterator.hasNext) {
      val otherSideEntry = otherSideIterator.next()
      val otherSideRow = otherSideEntry.getKey
      val otherSideCntAndExpiredTime = otherSideEntry.getValue
      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
     // Step3: 调用JoinFunction执行真实的Join任务
      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
      // clear expired data. Note: clear after join to keep closer to the original semantics
     // Step4: 清理对侧状态数据
      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
        otherSideIterator.remove()
      }
    }
  }

Time-windowed Joins

A time-windowed join is defined by a join predicate, that checks if the time attributes of the input records are within certain time constraints, i.e., a time window.

  • 基于时间窗口的Join: join条件中带有TimeWindow,如下:
SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

DataStreamWindowJoin

  • Windowed Join也支持Append输出数据,不支持Update输出;
    • Update Mode: 在Flink中,如果DataStreamRel算子支持producesUpdatesproducesRetractions,则每次输入都会产生数据(可能不是最终完整的结果),默认false;
      • DataStreamGroupAggregate: 没有window的Group时,producesUpdates=true,对于Agg即使不是最终结果也需要把结果输出;
      • DataStreamJoin: 当不是InnerJoin时支持producesRetractions
    • Append Mode: 在Flink中,除了上述算子之外的算子则认为是Append Mode
WindowBound的计算
  • 计算join时需要先确认WindowsBound的范围;
  • WindowBound是基于左表的左沿(为负值)、右沿取值范围;
  • Join的窗口长度即为右沿 - 左沿所得的值;

其推算逻辑如下:

--sql1: [-3600000, 3600000]
SELECT t1.a, t2.b 
FROM MyTable as t1 
join MyTable2 as t2 
on t1.a = t2.a 
and  t1.proctime between t2.proctime - interval '1' hour  and t2.proctime + interval '1' hour;

--sql2: [-999, 999]
t1.proctime > t2.proctime - interval '1' second and t1.proctime < t2.proctime + interval '1' second

--sql3: [-1999, 1999]
t2.c > t1.c - interval '2' second and t2.c < t1.c + interval '2' second

RowTimeBoundedStreamJoin(Join的实现逻辑)

下面基于处理左流的逻辑分析执行Join的实现。

涉及到的状态管理
  • leftCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]], 左表的状态结构:
    • Key值:左表的eventTime;
    • Value值: [Row行,该行是否emit过];
  • rightCache: MapStateDescriptor[Long, JList[JTuple2[Row, Boolean]]];
  • leftTimerState: ValueStateDescriptor[Long], 左表的Timer状态;
  • rightTimerState: ValueStateDescriptor[Long];
涉及到的几个时间变量
  • leftOperatorTime/rightOperatorTime
    • 左表表的操作时间,两个值相同;
    • 对于ProcTimeBoundedStreamJoin: 即为当前的处理时间;
    • 对于RowTimeBoundedStreamJoin: 即为当前的watermark时间;
  • rightQualifiedLowerBound/rightQualifiedUpperBound: 通过左表的WindowBound计算右表的左沿值(可以理解为对称的结构)
  • rightExpirationTime:
    • 计算公式:leftOperatorTime - rightRelativeSize(BoundWindow的右值) - allowedLateness - 1
  • rightTime: 从右表的状态中获取每行的row的真实时间戳;
步骤梳理
  • 判断是否需要同右表状态数据做Join:
    • 符合Join右表的状态条件, 遍历右表的状态数据:
      • 执行Join算子;
      • 判断右值数据是否过期,如果过期,如果是rightJoin/outerJoin进行emit null,同时将该值删除掉;
  • 判断是否将输入数据写入到状态数据;
  • 如果不写入状态数据,如果是leftJoin/outerJoin则emit null;
/**
    * Process rows from the left stream.
    */
  override def processElement1(
      cRowValue: CRow,
      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
      out: Collector[CRow]): Unit = {

    joinCollector.innerCollector = out
    // 更新OperatorTime,即leftOperatorTime/rightOperatorTime的值;
    updateOperatorTime(ctx)
    val leftRow = cRowValue.row
    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
    // 通过左表的WindowBound计算右表的左沿值(可以理解为对称的结构)
    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize
    // 通过左表的WindowBound计算右表的右沿值
    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize
    var emitted: Boolean = false

    // Check if we need to join the current row against cached rows of the right input.
    // The condition here should be rightMinimumTime < rightQualifiedUpperBound.
    // We use rightExpirationTime as an approximation of the rightMinimumTime here,
    // since rightExpirationTime <= rightMinimumTime is always true.
    // 通过上次计算的右表ExpriedTime来评估是否需要跟左表做Join:
    // - 从反面看:因为rightExpirationTime是上次计算的值,真实值一定比该值大,如果该值都不小于rightQualifiedUpperBound,说明右流很快了,左右流的Join的窗口范围早已经过了,就不需要Join了;
    if (rightExpirationTime < rightQualifiedUpperBound) {
      // Upper bound of current join window has not passed the cache expiration time yet.
      // There might be qualifying rows in the cache that the current row needs to be joined with.
      // leftOperatorTime: 左表的操作时间;
      // - 如果是ProcTimeBoundedStreamJoin处理:该时间即为processiong time,当前处理的时间;
      // - 如果是RowTimeBoundedStreamJoin处理:该时间即为event time,返回的是当前的watermark(?);
      // calExpirationTime函数的逻辑即为:leftOperatorTime
      rightExpirationTime = calExpirationTime(leftOperatorTime, rightRelativeSize)
      // Join the leftRow with rows from the right cache.
      // LeftRow同右表状态做Join;
      val rightIterator = rightCache.iterator()
      while (rightIterator.hasNext) {
        val rightEntry = rightIterator.next
        val rightTime = rightEntry.getKey

        // 确认右表状态的范围在预期的WindowFbound范围之内
        if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
          val rightRows = rightEntry.getValue
          var i = 0
          var entryUpdated = false
          while (i < rightRows.size) {
            joinCollector.reset()
            val tuple = rightRows.get(i)
            // 执行Join的逻辑
            joinFunction.join(leftRow, tuple.f0, joinCollector)
            // 标记该行是否emitted过,该flag可以用作在最后确认是否在left join/outer join时最终是否emit null值使用:
            // 如果已经emitted过,则最终不会emit null;
            emitted ||= joinCollector.emitted
            if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
              if (!tuple.f1 && joinCollector.emitted) {
                // Mark the right row as being successfully joined and emitted.
                tuple.f1 = true
                entryUpdated = true
              }
            }
            i += 1
          }
          if (entryUpdated) {
            // Write back the edited entry (mark emitted) for the right cache.
            rightEntry.setValue(rightRows)
          }
        }
        // 处理有表状态key已经过期的情况;
        if (rightTime <= rightExpirationTime) {
          if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
            val rightRows = rightEntry.getValue
            var i = 0
            while (i < rightRows.size) {
              val tuple = rightRows.get(i)
              if (!tuple.f1) {
                // Emit a null padding result if the right row has never been successfully joined.
                joinCollector.collect(paddingUtil.padRight(tuple.f0))
              }
              i += 1
            }
          }
          // eager remove
          rightIterator.remove()
        } // We could do the short-cutting optimization here once we get a state with ordered keys.
      }
    }

    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound of the current
      // row. Put it into the left cache, since later coming records from the right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      }
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)
      if (rightTimerState.value == 0) {
        // Register a timer on the RIGHT stream to remove rows.
        registerCleanUpTimer(ctx, timeForLeftRow, leftRow = true)
      }
    } else if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) {
      if (!emitted) {
        // Emit a null padding result if the left row is not cached and successfully joined.
        joinCollector.collect(paddingUtil.padLeft(leftRow))
      }
    }
  

疑惑

  • 针对生产级别,窗口较大时(状态数据的积累会很大),每条流数据符合条件时,都会遍历对侧所有的状态数据,这个性能是不是会很低?

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容