第七章 处理函数

Flink本身提供了多层API,前面介绍的DataStream API只是其中的一环。


在前面的章节介绍了诸多Flink提供的算子(如map、filter、widow等)。除了使用这些已有的算子,我们也可以自定义算子,也即本章的处理函数(process function)。

基本处理函数(ProcessFunction)

之前讲的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限,如map算子,只能获取到当前的数据。而窗口聚合算子AggregateFuntion以及富函数(如RichMapFunction),可以拿到上下文状态等信息。但是无论那种算子,想要访问时间戳,或者当前水位线信息,都是完全做不到的,这时就需要处理函数(ProcessFunction)。

处理函数提供了一个“定时服务”(TimeService),我们可以通过它访问流中的时间(event),时间戳(timestamp),水位线(Watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数汉可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个DataStream API的最底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。stream.process(new MyProcessFunction)

一个梨子:

package com.whu.chapter07

import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.functions._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      .process(new ProcessFunction[Event, String] {
        // 每来一条元素都会调用一次
        override def processElement(i:Event, context:ProcessFunction[Event,String]#Context, collector: Collector[String]):Unit = {
          if(i.user.equals("Mary")){
            // 向下游发送数据
            collector.collect(i.user)
          }else if(i.user.equals("bob")){
            collector.collect(i.user)
          }
          // 打印当前水位线
          println(context.timerService.currentWatermark())
        }
      })
      .print()

    env.execute()
  }
}

ProcessFunction解析:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
  ...
  public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
  ...
}

processElement方法用于处理原始,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及收集器(Collector)out。方法没有返回值,处理之后的输出数据通过收集器的out定义。

  • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致;
  • ctx: 类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法output();
  • out: “收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap()算子中的收集器完全一样,直接调用out.collect()方法就可以像下游发出一个数据。这个方法可多次调用,也可以不调用。

onTimer用于定时触发的操作,这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。

处理函数的分类

Flink提供了8个不同的处理函数:

  • ProcessFunction,最基本的处理函数,基于 DataStream 直接调用 process()时作为参数传入;
  • KeyedProcessFunction,对流按键分区后的处理函数,基于 KeyedStream 调用 process()时作为参数传入。要想使用定时器,必须基于 KeyedStream;
  • ProcessWindowFunction,开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process()时作为参数传入;
  • ProcessAllWindowFunction,同样是开窗之后的处理函数,基于 AllWindowedStream 调用 process()时作为参数传入;
  • CoProcessFunction,合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process()时作为参数传入;
  • ProcessJoinFunction,间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process()时作为参数传入;
  • BroadcastProcessFunction,广播连接流处理函数,基于 BroadcastConnectedStream 调用 process()时作为参数传入;
  • KeyedBroadcastProcessFunction,按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用 process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream
    与广播流(BroadcastStream)做连接之后的产物。

按键分区处理函数(KeyedProcessFunction)

在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。

package com.whu.chapter07

import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.sql.Timestamp

object KeyedProcessFunctionDemo {
  def main(args: Array[String]):Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      .keyBy(r=>true)
      .process(new KeyedProcessFunction[Boolean, Event, String]{
        override def processElement(i: Event, context: KeyedProcessFunction[Boolean, Event, String]#Context, collector: Collector[String]): Unit = {
          val currTs = context.timerService.currentProcessingTime()
          collector.collect("数据到达时间:"+new Timestamp(currTs))
          // 注册10秒钟之后的处理时间定时器
          context.timerService().registerProcessingTimeTimer(currTs+10*1000L)
        }

        // 定时器逻辑
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
          out.collect("定时器触发时间:"+new Timestamp(timestamp))
        }
      }).print()

    env.execute()
  }
}

窗口处理函数

窗 口 处 理 函 数 ProcessWindowFunction 的 使 用 与 其 他 窗 口 函 数 类 似 , 也 是 基 于WindowedStream 直接调用方法就可以,只不过这时调用的是 process()。

ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
...
  public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
  public void clear(Context context) throws Exception {}
  public abstract class Context implements java.io.Serializable {...}
}

其有四个类型的参数:

  • IN: input, 数据流中窗口任务的输入数据类型;
  • OUT: output, 窗口任务进行计算之后的输出数据类型;
  • KEY: 数据中key的类型;
  • W:窗口的类型,是window的子类型,一般情况下我们定义时间窗口,w就是TimeWindow;

由于全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是 processElement(),
而是改成了 process()。方法包含四个参数:

  • key:窗口做统计计算基于的键,也就是之前keyBy()用来区分的字段;
  • context:当前窗口进行计算的上下文,他的类型就是ProcessWindowFunction内部定义的抽象类context;
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型;
  • out:用来发送数据输出计算结果的收集器,类型为Collector;

一个计算网页浏览量topN的例子:

package com.whu.chapter07


import org.apache.flink.streaming.api.functions._
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._
import java.sql.Timestamp
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable.ListBuffer

object ProcessWindowFunctionDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)

    // 只需要url就可以统计数量,所以抽取url转换成String,直接开窗统计
    stream.map(_.url)
      // 开窗
      .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          // 初始化一个map,key为url,value为url的pv数据
          val urlCountMap = Map[String, Long]()
          // 将url和pv数据写入Map中
          elements.foreach(r=>urlCountMap.get(r) match {
            case Some(count) => urlCountMap.put(r, count+1L)
            case None=> urlCountMap.put(r, 1L)
          })
          
          // 将Map中的键值对转换成列表数据结构
          // 列表中的元素是(K,V)元组
          var mapList = new ListBuffer[(String, Long)]()
          urlCountMap.keys.foreach(key=>
            urlCountMap.get(key) match {
              case Some(count) => mapList+=((key, count))
              case None => mapList
            }
          )
          // 按照浏览量进行降序排列
          mapList.sortBy(-_._2)
          // 输出
          val result = new StringBuilder
          result.append("=======\n")
          for(i <- 0 to 1){
            val temp = mapList(i)
            result.append("浏览量No."+(i+1)+" ")
              .append("url:"+temp._1+" ")
              .append("浏览量No."+(i+2)+" ")
              .append("url:"+temp._2+" ")
              .append("窗口结束时间是:"+ new Timestamp((context.window.getEnd))+"\n")
          }
        }
      })
      .print()
    
    env.execute()
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容