Apache Flink——窗口(Window)

前言

在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。

Window 是处理无限流的核心。Window 将流分成有限大小的“存储桶”,我们可以在其上应用计算。Flink 是一个优秀的流计算引擎,数据是源源不断的,它认为批处理 Batch 是一种特殊的流计算,在流中分割出一个个窗口,每个窗口相当于有限大小的空间,汇聚了待处理的数据。

Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

窗口更像一个“桶”,将流切割成有限大小的多个存储桶,每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

一、窗口类型

1.1 按照驱动类型分类

Flink中的窗口类型有两种:时间窗口(Time Window)、计数窗口(Count Window)。

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取
数据”。以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

  • 时间窗口(Time Window):按照时间段去截取数据,这在实际应用中最常见。

  • 计数窗口(Count Window):以由数据驱动,也就是说按照固定的个数,来截取一段数据集。

时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。

1.1.1 时间窗口(Time Window)

时间窗口以时间点到来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。

窗口大小 = 结束时间 - 开始时间

Flink中有一个专门的TimeWindow类来表示时间窗口,这个类只有两个私有属性,表示窗口的开始和结束的时间戳,单位为毫秒。

private final long start;
private final long end;

我们可以调用公有的getStart()和getEnd()方法直接获取这两个时间戳。另外TimeWindow还提供了maxTimestamp()方法,用来获取窗口中能够包含数据的最大时间戳。

public long maxTimestamp(){
    return end - 1;
}

很明显,窗口中的数据,最大允许的时间戳就是 end - 1,这也就代表了我们定义的窗口
时间范围都是左闭右开的区间[start,end)。

1.1.2 计数窗口(Count Window)

基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。

计数窗口理解简单,只需指定窗口大小,就可以把数据分配到对应的窗口中,Flink内部对应的类来表示计数窗口,底层通过全局窗口(Global Window)实现。

1.2、按照窗口分配数据的规则分类

时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

1.2.1 滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式,首尾相接。我们之前所举的例子都是滚动窗口,也正是因为滚动窗口无缝衔接,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

当固定了窗口大小之后,所有分区的窗口划分都是一致的;窗口没有重叠,每个数据只属于一个窗口。

滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它
来实现。

1.2.2 滑动窗口(Sliding Windows)

滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。滑动窗口可以基于时间定义、数据个数。

定义滑动窗口的参数与两个:窗口大小,滑动步长。滑动步长是固定的,且代表了两个个窗口开始/结束的时间间隔。数据分配到多个窗口的个数 = 窗口大小/滑动步长。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

1.2.3 会话窗口(Session Windows)

会话窗口只能基于时间来定义,“会话”终止的标志就是隔一段时间没有数据来。

size:两个会话窗口之间的最小距离。我们可以设置静态固定的size,也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔gap的值。

在Flink底层,对会话窗口有比较特殊的处理:每来一个新的数据,都会创建一个新的会话窗口,然后判断已有窗口之间的距离,如果小于给定的size,就对它们进行合并操作。在Winodw算子中,对会话窗口有单独的处理逻辑。

会话窗口的长度不固定、起始和结束时间不确定,各个分区窗口之间没有任何关联。会话窗口之间一定是不会重叠的,且会留有至少为size的间隔

1.2.4 全局窗口(Global Windows)

相同key的所有数据都分配到一个同一个窗口中;无界流的数据永无止境,窗口没有结束的时候,默认不做触发计算,如果希望对数据进行计算处理,还需要自定义“触发器”(Trigger)。

二、窗口 API 概览

2.1 按键分区窗口(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到达是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。

2.1.1 按键分区窗口(Keyed Windows)

经过按按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),也就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据被发送到同一个并行子任务,而窗口操作会基于每个key单独的处理。可以认为每个key上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调
用.window()定义窗口。

stream.keyBy(...)
    .window(...)

2.1.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。

2.2 代码中窗口 API 的调用

窗口的操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)

stream.keyBy(<key selector>)
     .window(<window assigner>)
    .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种。

另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2.2.1 窗口分配器(Window Assigners)

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。

窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个
WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是AllWindowedStream。

窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型 Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.2.2 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

  • 1、滚动处理时间窗口
    窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
stream.keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(...)

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建
了一个长度为 5 秒的滚动窗口。

另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。比如,标志时间戳是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,这个时间时UTC时间,以0时区为标准,而我们所在的时区为东八区(UTC+8)。我们定义一天滚动窗口时,伦敦时间0但对应北京时间早上8点。那么设定如下就可以得到北京时间每天0点开开启滚动窗口

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
  • 2、滑动处理时间窗口
    窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

  • 3、处理时间会话窗口
    窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
stream.keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)

这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

.window(ProcessingTimeSessionWindows.withDynamicGap(new
    SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
     @Override
     public long extract(Tuple2<String, Long> element) {
        // 提取 session gap 值返回, 单位毫秒
        return element.f0.length() * 1000;
     }
}))

这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。

  • 4、滚动事件时间窗口
    窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .aggregate(...)
  • 5、滑动事件时间窗口
    窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(...)
  • 6、事件时间会话窗口
    窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)

2.2.3 计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。

  • 1、滚动计数窗口
    滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。
stream.keyBy(...)
    .countWindow(10)

这里定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。

  • 2、滑动计数窗口
    与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...)
    .countWindow(10,3)

这里定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。

  • 3、全局窗口
    全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。
stream.keyBy(...)
    .window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

2.3 窗口函数(Window Functions)

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream。

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

2.3.1 增量聚合函数(incremental aggregation functions)

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效了——这相当于真的在用批处理的思路来做实时流处理。

为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。

  • 1、归约函数(ReduceFunction)
    最基本的聚合方式就是归约(reduce)。在基本转换的聚合算子中介绍过 reduce 的用法,窗口的归约聚合也非常类似,就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

窗口函数中也提供了 ReduceFunction:只要基于 WindowedStream 调用.reduce()方法,然后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。这里的 ReduceFunction 其实与简单聚合时用到的 ReduceFunction 是同一个函数类接口,所以使用方式也是完全一样的。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

public class WindowReduceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从自定义数据源读取数据,并提取时间戳、生成水位线
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                // 将数据转换成二元组,方便计算
                return Tuple2.of(value.user, 1L);
            }
        }).keyBy(r -> r.f0)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 定义累加规则,窗口闭合时,向下游发送累加结果
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();

        env.execute();
    }
}
  • 2、聚合函数(AggregateFunction)
    ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。

Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个
AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable{
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    ACC merge(ACC a, ACC b);
}

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚
    合任务只会调用一次。

  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进
    一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器
    accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之
    后都会调用这个方法。

  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,
    然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均
    值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终
    结果。这个方法只在窗口要输出结果时调用。

  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在
    需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景
    就是会话窗口(Session Windows)。

所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

下面来看一个具体例子。我们知道,在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。一般来说,PV 统计的是所有的点击量;而对用户 id 进行去重之后,得到的就是 UV。所以有时我们会用 PV/UV 这个比值,来表示“人均重复访问量”,也就是平均每个用户会访问多少次页面,这在一定程度上代表了用户的粘度。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.HashSet;

public class WindowAggregateTest_PVUV {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        stream.print("data");
        // 所有数据放在一起统计pv和uv
        stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
                .aggregate(new AvgPv()).print();

        env.execute();
    }

    //自定义一个AggregateFunction,用Long保存pv个数,用HashSet做uv去重
    public static class AvgPv implements AggregateFunction<Event, Tuple2<Long, HashSet<String>>,Double>{

        @Override
        public Tuple2<Long, HashSet<String>> createAccumulator() {
            return Tuple2.of(0L,new HashSet<>());
        }

        @Override
        public Tuple2<Long, HashSet<String>> add(Event value, Tuple2<Long, HashSet<String>> accumulator) {
            //每来一条数据,pv个数+1,将user放入HashSet中
            accumulator.f1.add(value.user);
            return Tuple2.of(accumulator.f0 + 1,accumulator.f1);
        }

        @Override
        public Double getResult(Tuple2<Long, HashSet<String>> accumulator) {
            //窗口触发时,输出pv和uv的比值
            return (double) accumulator.f0 / accumulator.f1.size();
        }

        @Override
        public Tuple2<Long, HashSet<String>> merge(Tuple2<Long, HashSet<String>> a, Tuple2<Long, HashSet<String>> b) {
            return null;
        }
    }
    
}

代码中我们创建了事件时间滑动窗口,统计 10 秒钟的“人均 PV”,每 2 秒统计一次。由
于聚合的状态还需要做处理计算,因此窗口聚合时使用了更加灵活的 AggregateFunction。为了统计 UV,我们用一个 HashSet 保存所有出现过的用户 id,实现自动去重;而 PV 的统计则类似一个计数器,每来一个数据加一就可以了。所以这里的状态,定义为包含一个 HashSet 和一个 count 值的二元组(Tuple2<HashSet<String>, Long>),每来一条数据,就将 user 存入 HashSet,
同时 count 加 1。这里的 count 就是 PV,而 HashSet 中元素的个数(size)就是 UV;所以最终窗口的输出结果,就是它们的比值。

这里没有涉及会话窗口,所以 merge()方法可以不做任何操作。

另外,Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于
WindowedStream 调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与 KeyedStream 的简单聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的。

通过 ReduceFunction 和 AggregateFunction 我们可以发现,增量聚合函数其实就是在用流处理的思路来处理有界数据集,核心是保持一个聚合状态,当数据到来时不停地更新状态。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。

2.3.2 全窗口函数(full window functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间,而在之前收集数据的漫长过程中却无所事事。

全窗口函数存在是因为在有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的
窗口计算方式,这就可以用全窗口函数来实现。

在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。

  • 1、窗口函数(WindowFunction)
    WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可
    以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。
stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口
(Window)本身的信息。WindowFunction 接口在源码中实现如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function,Serializable {
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集
合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这里 Collector 的用法,与 FlatMapFunction 中相同。

不过我们也看到了,WindowFunction 能提供的上下文信息较少,也没有更高级的功能。
事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。一般在实际应用,直接使用 ProcessWindowFunction 就可以了。

  • 1、处理窗口函数(ProcessWindowFunction)
    ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

当 然 , 这 些 好 处 是 以 牺 牲 性 能 和 资 源 为 代 价 的 。 作 为 一 个 全 窗 口 函 数 ,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。

具体使用跟 WindowFunction 非常类似,我们可以基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类。下面是一个电商网站统计每小时 UV 的例子:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;

public class WindowProcessTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        //使用ProcessWindowFunction计算UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();
    }

    //实现自定义的ProcessWindowFunction,输出一条统计信息
    public static class UvCountByWindow extends ProcessWindowFunction<Event,String,Boolean, TimeWindow>{

        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            //用一个HashSet保存user
            HashSet<String> userSet = new HashSet<>();
            //从elements中遍历数据,放到set中去重
            for (Event element : elements) {
                userSet.add(element.user);
            }

            Integer uv = userSet.size();
            //结合窗口信息
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect("窗口" + new Timestamp(start) + "~" + new Timestamp(end) + "UV值为:" + uv);
        }
    }
}

这里使用的是事件时间语义。定义 10 秒钟的滚动事件窗口后,直接使用ProcessWindowFunction 来定义处理的逻辑。我们可以创建一个 HashSet,将窗口所有数据的userId 写入实现去重,最终得到 HashSet 的元素个数就是 UV 值。

当 然 , 这 里 我 们 并 没 有 用 到 上 下 文 中 其 他 信 息 , 所以其实没有必要使用ProcessWindowFunction。全窗口函数因为运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。

2.3.3 增量聚合和全窗口函数的结合使用

增量聚合的优点:高效,输出更加实时。
增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。

全窗口的优点:提供更多的信息,可以认为是更加“通用”的窗口操作。
它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的
Window API 就给我们实现了这样的用法。

之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入
了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者ProcessWindowFunction。

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)

// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)

// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)

// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接;想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class UrlCountViewExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        stream.print("input");

        //统计每个url的访问量
        stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
                .print();


        env.execute();
    }

    //增量聚合,来一条数据 + 1
    public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    //包装窗口信息,输出UrlViewCount
    public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long count = elements.iterator().next();
            out.collect(new UrlViewCount(s,count,start,end));
        }
    }
}

为了方便处理,单独定义了一个POJO类,来表示输出结果的数据类型

public class UrlViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
        this.url = url;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + new Timestamp(windowStart) +
                ", windowEnd=" + new Timestamp(windowEnd) +
                '}';
    }
}

代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一;得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。

注:ProcessWindowFunction 是处理函数中的一种,后面我们会详细讲解。这里只用它来将增量聚合函数的输出结果包裹一层窗口信息。

窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

测试水位线和窗口的使用

之前讲过,当水位线到达窗口结束时间时,窗口就会闭合不再接收迟到的数据,因为根据水位线的定义,所有小于等于水位线的数据都已经到达,所以显然 Flink 会认为窗口中的数据都到达了(尽管可能存在迟到数据,也就是时间戳小于当前水位线的数据)。我们可以在之前生成水位线代码 WatermarkTest 的基础上,增加窗口应用做一下测试:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class WatermarkTest2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 将数据源改为 socket 文本流,并转换成 Event 类型
        env.socketTextStream("hadoop102", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new Event(fields[0].trim(), fields[1].trim(),
                                Long.valueOf(fields[2].trim()));
                    }
                })
                // 插入水位线的逻辑
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 5s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    // 抽取时间戳的逻辑
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                )
                // 根据 user 分组,开窗统计
                .keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new WatermarkTestResult())
                .print();
        env.execute();
    }

    // 自定义处理窗口函数,输出当前的水位线和窗口信息
    public static class WatermarkTestResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素, 窗口闭合计算时,水位线处于:" + currentWatermark);
        }
    }

}

当最后输入Alice, ./prod?id=300, 15000时,流中会周期性地插入一个时间戳为15000L – 5 * 1000L – 1L = 9999 毫秒的水位线,已经到达了[0,10000)的结束时间。

三、其他API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供
了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。

3.1 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗
口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
    .window(...)
    .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认
的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。

Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

可以看到,除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。这里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。
上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。稍后我们就会看到它们分开操作的场景。

下面我们举一个例子。在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class TriggerExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                })
        )
                .keyBy(r -> r.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

        env.execute();
    }

    public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> collector) throws Exception {
            collector.collect(
                    new UrlViewCount(
                            s,
                            // 获取迭代器中的元素个数
                            iterable.spliterator().getExactSizeIfKnown(),
                            context.window().getStart(),
                            context.window().getEnd()
                    )
            );
        }
    }

    public static class MyTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            if (isFirstEvent.value() == null) {
                for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 1000L) {
                    triggerContext.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE;
        }

        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
            );
            isFirstEvent.clear();
        }
    }
}

3.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
    .window(...)
    .evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

3.3 允许延迟(Allowed Lateness)

在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线
(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。

不过在多数情况下,直接丢弃数据也会导致统计结果不准确,我们还是希望该上车的人都能上来。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

水位线 = 窗口结束时间 + 延迟时间

stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .allowedLateness(Time.minutes(1))

就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不
过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

3.4 将迟到的数据放入侧输出流

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧
输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了、本该被丢弃的数据。

基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

sideOutputLateData() 方法,传入一个输出标签,用来标记分治的迟到数据流

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的
DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)
    .aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

四、窗口的生命周期

4.1 窗口的创建

窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

4.2 窗口计算的触发

除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。

对于不同的窗口类型,触发计算的条件也会不同。例如,一个滚动事件时间窗口,应该在水位线到达窗口结束时间的时候触发计算,属于“定点发车”;而一个计数窗口,会在窗口中元素数量达到定义大小时触发计算,属于“人满就发车”。所以 Flink 预定义的窗口类型都有对应内置的触发器。

对于事件时间窗口而言,除去到达结束时间的“定点发车”,还有另一种情形。当我们设
置了允许延迟,那么如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算。这类似于没有准时赶上班车的人又追上了车,这时车要再次停靠、开门,将新的数据整合统计进来。

4.3 窗口的销毁

一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw)实现的,而全局窗口不会清除状态,所以就不会被销毁。

在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

4.4 窗口 API 调用总结

Window API 首先按照时候按键分区分成两类。keyBy 之后的 KeyedStream,可以调
用.window()方法声明按键分区窗口(Keyed Windows);而如果不做 keyBy,DataStream 也可以直接调用.windowAll()声明非按键分区窗口。之后的方法调用就完全一样了。

接下来首先是通过.window()/.windowAll()方法定义窗口分配器,得到 WindowedStream;然 后 通 过 各 种 转 换 方 法 ( reduce/aggregate/apply/process ) 给 出 窗 口 函 数(ReduceFunction/AggregateFunction/ProcessWindowFunction),定义窗口的具体计算处理逻辑,转换之后重新得到 DataStream。这两者必不可少,是窗口算子(WindowOperator)最重要的组成部分。

此外,在这两者之间,还可以基于 WindowedStream 调用.trigger()自定义触发器、调
用.evictor()定义移除器、调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData()将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流,可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流。

五、迟到数据的处理

所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

事件时间里用来表示时钟进展的就是水位线(watermark)。对于乱序流,水位线本身就可以设置一个延迟时间;而做窗口计算时,我们又可以设置窗口的允许延迟时间;另外窗口还有将迟到数据输出到测输出流的用法。

5.1 设置水位线延迟时间

水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。

之前我们讲到触发器时曾提到过“定时器”,时间窗口的操作底层就是靠定时器来控制触
发的。既然是底层机制,定时器自然就不可能是窗口的专利了;事实上它是 Flink 底层 API——处理函数(process function)的重要部分。

所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后,相当于是上帝拨动了琴弦,所有人的表都变慢了。

既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级。

当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。

5.2 允许窗口处理迟到数据

对于窗口计算而言,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么之后再来的数据就要被丢弃了。

Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。

这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。

5.3 将迟到数据放入窗口侧输出流

即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?

那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗
口允许迟到数据,以及将迟到数据放入窗口侧输出流。

我们可以回忆一下之前 统计每个 url 浏览次数的代码 UrlViewCountExample,稍作改进,增加处理迟到数据的功能。具体代码如下。

import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;


public class LateDataTest {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);

        //乱序流的Watermark生成
        SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
                // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取时间戳的逻辑
                                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        //定义一个输出标签
        OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

        //统计每个url的访问量
        SingleOutputStreamOperator<UrlCountView> result = streamOperator.keyBy(Event::getUrl)
                //滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //允许窗口处理迟到数据 允许1分钟的延迟
                .allowedLateness(Time.minutes(1))
                //将最后的迟到数据输出到侧输出流
                .sideOutputLateData(lateTag)
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        result.print("result");
        result.getSideOutput(lateTag).print("late");

        env.execute();
    }

    
    /**
     * 自定义实现AggregateFunction, 增量计算url页面的访问量,来一条数据就 +1
     */
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }


    /**
     * 自定义实现ProcessWindowFunction, 包装窗口信息输出
     */
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow> {
        
        @Override
        public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlCountView> out) throws Exception {
            Long urlCount = iterable.iterator().next();
            //集合窗口信息输出
            long start = context.window().getStart();
            long end = context.window().getEnd();
            UrlCountView urlCountView = new UrlCountView();
            urlCountView.setUrl(url);
            urlCountView.setCount(urlCount);
            urlCountView.setWindowStart(start);
            urlCountView.setWindowEnd(end);
            out.collect(urlCountView);
        }
    }
}

总结

在流处理中,由于对实时性的要求非常高,同时又要求能够保证窗口操作结果的正确,所以必须引入水位线来描述事件时间。而窗口正是时间相关的最佳应用场景,所以 Flink 为我们提供了丰富的窗口类型和处理操作;与此同时,在实际应用中很难对乱序流给出一个最佳延迟时间,单独依赖水位线去保证结果正确性是不够的,所以需要结合窗口(Window)处理迟到数据的相关 API。

Flink 中时间语义和水位线的概念、窗口 API 的用法以及处理迟到数据的相关知识,这些内容对于实时流处理来说非常重要。

Flink 的时间语义和窗口,主要就是为了处理大规模的乱序数据流时,同时保证低延迟、
高吞吐和结果的正确性。这部分设计基本上是对谷歌(Google)著名论文《数据流模型:一种在大规模、无界、无序数据处理中平衡正确性、延迟和性能的实用方法》(The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing)的具体实现,如果读者有兴趣可以读一下原始论文,会对流处理有更加深刻的理解。

参考:
https://blog.csdn.net/Zp_insist/article/details/124935249

https://blog.csdn.net/mynameisgt/article/details/124223193

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容