1.知识点
- scala样例类
- flink读文件
- assignAscendingTimestamps水位线
- KeyedProcessFunction<K, I, O>使用
- flink状态编程之键控状态
- flink定时器的创建
ctx.timerService().registerProcessingTimeTimer(ts)
- flink定时器的触发操作,一般可以对状态进行清空
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[(Long, Long),
AdClickLog,
AdClickLog]#OnTimerContext,
out: Collector[AdClickLog]
): Unit = {
if (timestamp == resetTimerTsState.value()){
isBlackState.clear()
countState.clear()
}
}
- flink侧输出流OutPutTag的使用
ctx.output(new OutputTag[BlackListUserWarning]("warning"),
BlackListUserWarning(value.userId,
value.adId,
"Click ad over " + maxCount + " times today."))
- aggregate方法的使用,预聚合中间结果(可以与processWindowFunction做比较)
- aggregate输入(AggregateFunction , WindowFunction ) 预聚合在进行窗口内操作。
@PublicEvolving
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
preAggregator: AggregateFunction[T, ACC, V],
windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {
2.业务目标
-
页面广告点击量统计:每5s钟统计前一个小时的某个省份的广告点击总量
窗口:1个小时
步长:5s
维度:省份
指标:点击总量
输出格式:
count result> AdClickCountByProvince(2017-11-26 11:01:30.0,beijing,11)
count result> AdClickCountByProvince(2017-11-26 11:01:30.0,zhejiang,2)
count result> AdClickCountByProvince(2017-11-26 11:01:30.0,henan,1)
count result> AdClickCountByProvince(2017-11-26 11:01:35.0,beijing,6)
count result> AdClickCountByProvince(2017-11-26 11:01:40.0,beijing,1)
- 黑名单过滤:对当日点击某个adid大于某个值(如20000)的用户id拉入黑名单,并过滤
3.流程心法
- 定义输入输出样例类,定义黑名单样例类
- 主object
1)创建执行环境,设置事件时间语义,设置并行度
2)从文件中读取数据
3)转换成样例类并设置assignAscendingTimestamps自增水位线
4)定义黑名单过滤KeyedProcessFunction
5)开窗聚合统计,以province为keyBy,用aggregate或者ProcessWindowFunction
6)侧输出流获取并打印
4.模块详解
4.1 定义输入输出样例类,定义黑名单样例类
输入-广告点击日志:用户id、广告id、省份、城市、时间戳
输出-按省份统计的点击:窗口结束时间、省份、点击量
输出-黑名单报警:用户id、广告id、报警消息
// 定义输入输出样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class AdClickCountByProvince(windowEnd: String, province: String, count: Long)
// 侧输出流黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)
4.2 主object实现
4.2.1 创建执行环境、读取数据、输入数据转换成样例类
4.2.2 黑名单的过滤实现
刷单行为的用户输出到侧输出流报警,那么如何设置keyby呢?首先定义刷单行为,人刷广告,超过一定数量即为刷单行为。所以我们要以userid和adid为key,去统计条数.调用process方法,传入自定义的KeyedProcessFunction
val filterBlackListUserStream: DataStream[AdClickLog] = adLogStream
.keyBy(data => (data.userId, data.adId))
.process(new FliterBlackListUserResult(100)) //引入一个KeyedProcessFunction
** 1.定义FliterBlackListUserResult(100)继承KeyedProcessFunction<K, I, O>**
key是(Long,Long)二元组,输入是AdClickLog,输出也是AdClickLog。100表示当点击阈值达到100时即可判断为刷单行为(可调)
** 2. 定义状态**
- 定义点击量状态:如果要判断一个人对一个广告id点击是否达成上线,必须要创建一个状态保存点击量,不断去判断此时的状态值是否达到黑名单阈值
- 定义每日0点定时器触发时的时间戳状态:每天0点定时清空状态
- 状态变量要么放在open里,要么加上lazy,否则无法获取到上下文
lazy val countState:ValueState[Long] =
getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] =
getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
lazy val isBlackState:ValueState[Boolean] =
getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))
3.处理每一条数据processElement
判断只要是第一个数据来了,直接注册0点的清空状态定时器,跟当前用户是否黑名单及点击数量没关系。只要在今天范围内,不管用户有没有点击达到黑名单上线,比如到0点只点了5下,也要进行清空
if( curCount == 0 ) {
//(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):
//首先拿到当 前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,
//转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.
//然后转换成毫秒* (24 * 60 * 60 * 1000)
//因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,
//所以要减掉8个小时.- 8 * 60 * 60 * 1000
val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000
resetTimerTsState.update(ts) //保存状态
ctx.timerService().registerProcessingTimeTimer(ts) //注册一个处理时间的定时器
}
4.判断count值是否已经达到阈值,超出就输出到黑名单侧流,并更新黑名单状态为true
if(curCount >= maxCount){
//判断是否已经在黑名单里,没有输出侧输出流
if(!isBlackState.value()){
isBlackState.update(true)
ctx.output(new OutputTag[BlackListUserWarning]("warning"),
BlackListUserWarning(value.userId,
value.adId,
"Click ad over " + maxCount + " times today."))
}
return
}
5.定时器的触发
- 清空点击量状态、清空黑名单状
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if (timestamp == resetTimerTsState.value()){
isBlackState.clear()
countState.clear()
}
}
完整代码:
//自定义keyedProcessFunction,当此人在这个广告上点击量超过maxcount时即被拉入黑名单,
//key为(userid,adid),
//输入是AdClickLog,输出也是AdClickLog
//state有三种,用哪种呢? filterState?因为需要清空,定时器
class FliterBlackListUserResult(maxCount:Long) extends KeyedProcessFunction[(Long,Long),AdClickLog,AdClickLog]{
//定义状态,保存用户对广告的点击量,ValueState.
//创建一个名称是count的ValueState和每天0点定时清空状态的时间戳resetTimerTsState和标记
//当前用户是否进入黑名单
//不加lazy的话开始创建的时候拿不到上下文,要么放到open方法里,
//要么加lazy懒加载模式,还没弄懂为什么?
//加入用到了很多定时器,怎么知道需要零点的时候清空呢?
//可以把当时设置定时器的时间戳存起来,只要当前时间=保存的时间戳,
//直接清空操作。如果不是就可能是别的定时器
//
lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
lazy val isBlackState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = {
val curCount = countState.value()
//判断只要是第一个数据来了,直接注册0点的清空状态定时器. 要清空状态的零点定时器什么时候注册呢?跟当前用户是否黑名单报警及curCount都没关系
//只要在今天范围内,只要用户来了都要注册一个定时器,到零点时就清空,不管说用户有没有点击达到黑名单的上线。比如1000是上线,到0点的时候
//点击了100下,那么这个0点也要出发清空状态的定时器
//curCount ==0时就是第一条数据来了
if( curCount == 0 ) {
//(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):首先拿到当前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.
//然后转换成毫秒* (24 * 60 * 60 * 1000)
//因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,所以要减掉8个小时.- 8 * 60 * 60 * 1000
val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000
resetTimerTsState.update(ts) //保存状态
ctx.timerService().registerProcessingTimeTimer(ts) //注册一个处理时间的定时器
}
//判断count值是否已经达到定义的阈值,如果超过就输出到黑名单
if(curCount >= maxCount){
//判断是否已经在黑名单里,没有输出侧输出流
if(!isBlackState.value()){
isBlackState.update(true)
ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId, value.adId, "Click ad over " + maxCount + " times today."))
}
return
}
//正常情况下,count加1,原样输出. 这个countStateflink保存在其一个key,state里。
//每个key都会有一个ValueState
countState.update(curCount + 1)
collector.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if (timestamp == resetTimerTsState.value()){
isBlackState.clear()
countState.clear()
}
}
}
4.2.3 aggregate(AggregateFunction,WindowFunction)的使用
val adCountResultStream = filterBlackListUserStream
.keyBy(_.province)
.timeWindow( Time.hours(1), Time.seconds(5) )
.aggregate(new AdCountAgg(), new AdCountWindowResult())
1.预聚合类AdCountAgg的实现
输入是AdClickLog,中间结果是加和的count既Long型,输出也是Long型
class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long]{
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b}
2.windowFunction AdCountWindowResult的实现
class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {
val end = new Timestamp(window.getEnd).toString
out.collect(AdClickCountByProvince(end, key, input.head))
}
}
5.完整代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.sql.Timestamp
// 定义输入输出样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class AdClickCountByProvince(windowEnd: String, province: String, count: Long)
// 侧输出流黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)
object AdClickAnalysis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 从文件中读取数据
val resource = getClass.getResource("/AdClickLog.csv")
val inputStream = env.readTextFile(resource.getPath)
//转换成样例类,并提取时间戳和watermark
val adLogStream = inputStream
.map(data =>{
val arr = data.split(",")
AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//插入一步过滤操作,并将有刷单行为的用户输出到侧输出流(黑名单报警)。如何设置keyBy呢,人刷广告,那么Key自然是(userid,adid)
val filterBlackListUserStream: DataStream[AdClickLog] = adLogStream
.keyBy(data => (data.userId, data.adId))
.process(new FliterBlackListUserResult(100)) //引入一个KeyedProcessFunction
//开窗聚合统计,以province为keyBy, 每5s钟输出前一个小时的总量
//1.可以直接使用processWindowFunction,也可以用aggregate预聚合
val adCountResultStream = filterBlackListUserStream
.keyBy(_.province)
.timeWindow( Time.hours(1), Time.seconds(5) )
.aggregate(new AdCountAgg(), new AdCountWindowResult())
adCountResultStream.print("count result")
//测输出流获取黑名单命中数据
filterBlackListUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("waring")
env.execute("ad count statistics job")
}
}
class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long]{
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {
val end = new Timestamp(window.getEnd).toString
out.collect(AdClickCountByProvince(end, key, input.head))
}
}
//自定义keyedProcessFunction,当此人在这个广告上点击量超过maxcount时即被拉入黑名单,key为(userid,adid),输入是AdClickLog,输出也是AdClickLog
//state有三种,用哪种呢? filterState?因为需要清空,定时器
class FliterBlackListUserResult(maxCount:Long) extends KeyedProcessFunction[(Long,Long),AdClickLog,AdClickLog]{
//定义状态,保存用户对广告的点击量,ValueState. 创建一个名称是count的ValueState和每天0点定时清空状态的时间戳resetTimerTsState和标记
//当前用户是否进入黑名单 //不加lazy的话开始创建的时候拿不到上下文,要么放到open方法里,要么加lazy懒加载模式,还没弄懂为什么?
//加入用到了很多定时器,怎么知道需要零点的时候清空呢?可以把当时设置定时器的时间戳存起来,只要当前时间=保存的时间戳,直接清空操作。如果不是就可能是别的定时器
//
lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
lazy val isBlackState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = {
val curCount = countState.value()
//判断只要是第一个数据来了,直接注册0点的清空状态定时器. 要清空状态的零点定时器什么时候注册呢?跟当前用户是否黑名单报警及curCount都没关系
//只要在今天范围内,只要用户来了都要注册一个定时器,到零点时就清空,不管说用户有没有点击达到黑名单的上线。比如1000是上线,到0点的时候
//点击了100下,那么这个0点也要出发清空状态的定时器
//curCount ==0时就是第一条数据来了
if( curCount == 0 ) {
//(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):首先拿到当前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.
//然后转换成毫秒* (24 * 60 * 60 * 1000)
//因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,所以要减掉8个小时.- 8 * 60 * 60 * 1000
val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000
resetTimerTsState.update(ts) //保存状态
ctx.timerService().registerProcessingTimeTimer(ts) //注册一个处理时间的定时器
}
//判断count值是否已经达到定义的阈值,如果超过就输出到黑名单
if(curCount >= maxCount){
//判断是否已经在黑名单里,没有输出侧输出流
if(!isBlackState.value()){
isBlackState.update(true)
ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId, value.adId, "Click ad over " + maxCount + " times today."))
}
return
}
//正常情况下,count加1,原样输出. 这个countStateflink保存在其一个key,state里。 每个key都会有一个ValueState
countState.update(curCount + 1)
collector.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if (timestamp == resetTimerTsState.value()){
isBlackState.clear()
countState.clear()
}
}
}