存在一下一种应用场景需要计算某一时刻的前后一段时间的相关统计信息。
此处存在一个rangeBetween(start,end)的Sql 函数,建立某一段数据记录的窗口,然后进行相关操作的数据统计分析。
val seq = Seq(("001","events1",10,"2016-05-01 10:50:51"),("002","events2",210,"2016-05-01 10:50:56"),("001","events3",100,"2016-05-01 10:58:51"),("001","events4",50,"2016-05-01 10:51:51"))
val rdd= sc.parallelize(seq)
import sqlContext.implicits._
val df = rdd.toDf("id","event","metric","time")
val ts = col("time").cast("timestamp").cast("long")
val interval = (round(ts/300.0)*300).cast("long").cast("timestamp").alias("interval")
df.groupBy($"id",interval).sum("metric").show(false)
val w = Window.partitionBy($"id").orderBy("metric").rangeBetween(-150,150)
df.withColumn("tss",ts).withColumn("sum_metric",sum("metric").over(w)).orderBy("id","time").show(flase)
以上代码中最最为关键的是:(window 必须在HiveContext中运行)
val w = Window.partitionBy($"id").orderBy("metric").rangeBetween(-150,150)
可以没有partitionBy 但是必须有orderBy("metric").rangeBetween(-150,150) 根据该排序列计算的值计算前后Range的窗口,进行数据统计(orderBy的是时间 则根据时间排序Range,是别的列 则根据该列的值进行排序Range)
如果不添加 OrderBy("") 报错:Non-zero range offsets are not supported foe windows with multiple order expressions
如果添加多个 则会报错:Window specification is not valid because this range window Frame only accepts at most one Order by expression.