Window Operation
Spark Streaming 也可以提供基于窗口的计算,这样允许你操作一个滑动窗口时间内的数据。下图展示了滑动窗口
如图所示,
每当窗口在输入数据流上滑动一次,在这个窗口内的源RDDs 就会被聚合和操作然后产生 基于窗口流的RDDs。在这个例子中,过去三个时间单元的数据会被操作一次,然后每次滑动两个时间单元。这就是说 任何窗口操作都需要指定两个参数:
- 窗口长度:窗口持续时间(图中是值3)
- 滑动间隔:每个窗口操作的时间间隔(图中是值2)
这两个参数必须是输入源数据流间隔时间的倍数,(图中是值 1)
让我们来用例子演示一下。比方说,你想要扩展一下之前的例子,要求能够每隔10s中,计算出过去30s的单词的统计值。为了做到这一点,我们需要在过去30s的键值对(word,1)的数据流(DataStream)留上使用 reduceByKey 这个操作.使用reduceByKeyAndWindow这个可以实现这个功能。
//Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a+b), Seconds(30), Seconds(10))
一些窗口的操作展示如下。所有的操作都需要刚刚说的两个参数- 窗口长度和滑动间隔
转化 | 简述 |
---|---|
window(windowLength,slideInterval) | 返回一个新的DStream ,它是基于窗口的源Dstream的batches 集合 |
countByWindow(windowLength,slideInterval) | 返回数据流的滑动窗口中的元素的数量 |
reduceByWindow(func,windowLength,slideInterval) | 在一个滑动间隔内,使用函数func 聚合元素,产生新的氮元素的流。这个func必须是联合交替的,才能正确的并行处理数据。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) | 当kv 键值对的数据流,返回一个新的kv键值对的新数据流,新数据流每个key通过 给定的reduce 函数func 在一个窗口内进行值得聚合。需要注意的: 这个使用spark默认并行数量(local模式的话是2 ,cluster模式的话取决于 配置参数 spark.default.parallelism)进行分组。你可以传入一个可选的参数 numTasks 参数设置一个不同的task的数量 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval、[numTasks]) | 比上面的reduceByKeyAndWindow 更有效的一个版本,能够在之前window的reduce 值 加上当前窗口计算reduce的值 。这个实现是通过reducing 新进入到窗口的数据,反向reducing 离开窗口的老数据。举个例子,随着窗口的滑动,对key的统计值进行加减。然后这个只适用于可以逆转的函数。也就是说,这些reduce的函数,有一个相关的逆向的函数。注意: 这个操作必须设置 checkpointing。 |
countByValueAndWindow (windowLength, slideInterval, [numTasks]) | 当kv键值对的数据流被调用的时候,返回一个新的kv键值对的数据流。就像 reduceByKeyAndWindow ,reduce的task的数量是可以通过配置修改的。 |
Join操作
最后,你如何在Spark Streaming中轻松的使用不同类型的join操作 是很值得强调的额。
Stream-stream joins
数据流可以轻松的和其他的流进行join。
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)