Flink - 实时 - 广告分析

1.知识点

  • scala样例类
  • flink读文件
  • assignAscendingTimestamps水位线
  • KeyedProcessFunction<K, I, O>使用
  • flink状态编程之键控状态
  • flink定时器的创建
ctx.timerService().registerProcessingTimeTimer(ts)
  • flink定时器的触发操作,一般可以对状态进行清空
override def onTimer(
    timestamp: Long, 
    ctx: KeyedProcessFunction[(Long, Long), 
    AdClickLog, 
    AdClickLog]#OnTimerContext, 
    out: Collector[AdClickLog]
  ): Unit = {     
  if (timestamp == resetTimerTsState.value()){      
    isBlackState.clear()      
    countState.clear()    
  }  
}
  • flink侧输出流OutPutTag的使用
ctx.output(new OutputTag[BlackListUserWarning]("warning"),
                BlackListUserWarning(value.userId, 
                                      value.adId, 
                                      "Click ad over " + maxCount + " times today."))
  • aggregate方法的使用,预聚合中间结果(可以与processWindowFunction做比较)
  • aggregate输入(AggregateFunction , WindowFunction ) 预聚合在进行窗口内操作。
@PublicEvolving  
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](      
  preAggregator: AggregateFunction[T, ACC, V],      
  windowFunction: WindowFunction[V, R, K, W]): DataStream[R] = {

2.业务目标

  • 页面广告点击量统计:每5s钟统计前一个小时的某个省份的广告点击总量

    窗口:1个小时

    步长:5s

    维度:省份

    指标:点击总量

    输出格式:

count result> AdClickCountByProvince(2017-11-26 11:01:30.0,beijing,11)
count result> AdClickCountByProvince(2017-11-26 11:01:30.0,zhejiang,2)
count result> AdClickCountByProvince(2017-11-26 11:01:30.0,henan,1)
count result> AdClickCountByProvince(2017-11-26 11:01:35.0,beijing,6)
count result> AdClickCountByProvince(2017-11-26 11:01:40.0,beijing,1)
  • 黑名单过滤:对当日点击某个adid大于某个值(如20000)的用户id拉入黑名单,并过滤

3.流程心法

  • 定义输入输出样例类,定义黑名单样例类
  • 主object

1)创建执行环境,设置事件时间语义,设置并行度
2)从文件中读取数据
3)转换成样例类并设置assignAscendingTimestamps自增水位线
4)定义黑名单过滤KeyedProcessFunction
5)开窗聚合统计,以province为keyBy,用aggregate或者ProcessWindowFunction
6)侧输出流获取并打印

4.模块详解

4.1 定义输入输出样例类,定义黑名单样例类

输入-广告点击日志:用户id、广告id、省份、城市、时间戳
输出-按省份统计的点击:窗口结束时间、省份、点击量
输出-黑名单报警:用户id、广告id、报警消息

// 定义输入输出样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class AdClickCountByProvince(windowEnd: String, province: String, count: Long) 
// 侧输出流黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)

4.2 主object实现

4.2.1 创建执行环境、读取数据、输入数据转换成样例类

4.2.2 黑名单的过滤实现

刷单行为的用户输出到侧输出流报警,那么如何设置keyby呢?首先定义刷单行为,人刷广告,超过一定数量即为刷单行为。所以我们要以userid和adid为key,去统计条数.调用process方法,传入自定义的KeyedProcessFunction

val filterBlackListUserStream: DataStream[AdClickLog] = adLogStream      
  .keyBy(data => (data.userId, data.adId))      
  .process(new FliterBlackListUserResult(100)) //引入一个KeyedProcessFunction

** 1.定义FliterBlackListUserResult(100)继承KeyedProcessFunction<K, I, O>**

key是(Long,Long)二元组,输入是AdClickLog,输出也是AdClickLog。100表示当点击阈值达到100时即可判断为刷单行为(可调)

** 2. 定义状态**

  • 定义点击量状态:如果要判断一个人对一个广告id点击是否达成上线,必须要创建一个状态保存点击量,不断去判断此时的状态值是否达到黑名单阈值
  • 定义每日0点定时器触发时的时间戳状态:每天0点定时清空状态
  • 状态变量要么放在open里,要么加上lazy,否则无法获取到上下文
lazy val countState:ValueState[Long] = 
    getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] = 
    getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
lazy val isBlackState:ValueState[Boolean] = 
    getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))

3.处理每一条数据processElement
判断只要是第一个数据来了,直接注册0点的清空状态定时器,跟当前用户是否黑名单及点击数量没关系。只要在今天范围内,不管用户有没有点击达到黑名单上线,比如到0点只点了5下,也要进行清空

if( curCount == 0 ) {      
  //(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):
  //首先拿到当  前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,
  //转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.      
  //然后转换成毫秒* (24 * 60 * 60 * 1000)      
  //因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,
  //所以要减掉8个小时.- 8 * 60 * 60 * 1000      
  val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000      
  resetTimerTsState.update(ts) //保存状态      
  ctx.timerService().registerProcessingTimeTimer(ts)  //注册一个处理时间的定时器    
}

4.判断count值是否已经达到阈值,超出就输出到黑名单侧流,并更新黑名单状态为true

if(curCount >= maxCount){      
  //判断是否已经在黑名单里,没有输出侧输出流      
  if(!isBlackState.value()){        
    isBlackState.update(true)        
    ctx.output(new OutputTag[BlackListUserWarning]("warning"),
                              BlackListUserWarning(value.userId, 
                                          value.adId, 
                                          "Click ad over " + maxCount + " times today."))      
  }       
  return     
}

5.定时器的触发

  • 清空点击量状态、清空黑名单状
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {     
  if (timestamp == resetTimerTsState.value()){      
    isBlackState.clear()      
    countState.clear()    
  }  
}

完整代码:

//自定义keyedProcessFunction,当此人在这个广告上点击量超过maxcount时即被拉入黑名单,
//key为(userid,adid),
//输入是AdClickLog,输出也是AdClickLog
//state有三种,用哪种呢? filterState?因为需要清空,定时器
class FliterBlackListUserResult(maxCount:Long) extends KeyedProcessFunction[(Long,Long),AdClickLog,AdClickLog]{   
  //定义状态,保存用户对广告的点击量,ValueState. 
  //创建一个名称是count的ValueState和每天0点定时清空状态的时间戳resetTimerTsState和标记  
  //当前用户是否进入黑名单  
  //不加lazy的话开始创建的时候拿不到上下文,要么放到open方法里,
  //要么加lazy懒加载模式,还没弄懂为什么?  
  //加入用到了很多定时器,怎么知道需要零点的时候清空呢?
  //可以把当时设置定时器的时间戳存起来,只要当前时间=保存的时间戳,
  //直接清空操作。如果不是就可能是别的定时器  
  //  
  lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))  
  lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))  
  lazy val isBlackState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))  
  override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = {     
    
     val curCount = countState.value()    
     //判断只要是第一个数据来了,直接注册0点的清空状态定时器. 要清空状态的零点定时器什么时候注册呢?跟当前用户是否黑名单报警及curCount都没关系    
     //只要在今天范围内,只要用户来了都要注册一个定时器,到零点时就清空,不管说用户有没有点击达到黑名单的上线。比如1000是上线,到0点的时候    
     //点击了100下,那么这个0点也要出发清空状态的定时器    
     //curCount ==0时就是第一条数据来了    
    if( curCount == 0 ) {      
        //(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):首先拿到当前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.      
        //然后转换成毫秒* (24 * 60 * 60 * 1000)      
        //因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,所以要减掉8个小时.- 8 * 60 * 60 * 1000      
      val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 *   60 * 60 * 1000) - 8 * 60 * 60 * 1000      
      resetTimerTsState.update(ts) //保存状态            
      ctx.timerService().registerProcessingTimeTimer(ts)  //注册一个处理时间的定时器    
    }     
    //判断count值是否已经达到定义的阈值,如果超过就输出到黑名单    
    if(curCount >= maxCount){      
      //判断是否已经在黑名单里,没有输出侧输出流      
      if(!isBlackState.value()){        
        isBlackState.update(true)        
        ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId, value.adId, "Click ad over " + maxCount + " times today."))      
      }       
      return     
    }   
  
    //正常情况下,count加1,原样输出. 这个countStateflink保存在其一个key,state里。
    //每个key都会有一个ValueState    
    countState.update(curCount + 1)    
    collector.collect(value)  
  }    
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {     
    if (timestamp == resetTimerTsState.value()){      
      isBlackState.clear()      
      countState.clear()    
    }  
  }  
}

4.2.3 aggregate(AggregateFunction,WindowFunction)的使用

val adCountResultStream = filterBlackListUserStream      
  .keyBy(_.province)      
  .timeWindow( Time.hours(1), Time.seconds(5) )      
  .aggregate(new AdCountAgg(), new AdCountWindowResult())

1.预聚合类AdCountAgg的实现
输入是AdClickLog,中间结果是加和的count既Long型,输出也是Long型

class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long]{  
  override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1   
  override def createAccumulator(): Long = 0L   
  override def getResult(accumulator: Long): Long = accumulator   
  override def merge(a: Long, b: Long): Long = a + b}

2.windowFunction AdCountWindowResult的实现

class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow]{  
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {    
    val end = new Timestamp(window.getEnd).toString      
    out.collect(AdClickCountByProvince(end, key, input.head))  
  }
}

5.完整代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector 

import java.sql.Timestamp  
// 定义输入输出样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class AdClickCountByProvince(windowEnd: String, province: String, count: Long) 

// 侧输出流黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)    

object AdClickAnalysis {   

  def main(args: Array[String]): Unit = {    
    val env = StreamExecutionEnvironment.getExecutionEnvironment        
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)      
    env.setParallelism(1)     

    // 从文件中读取数据    
    val resource = getClass.getResource("/AdClickLog.csv")    
    val inputStream = env.readTextFile(resource.getPath)     

    //转换成样例类,并提取时间戳和watermark    
    val adLogStream = inputStream      
      .map(data =>{        
        val arr = data.split(",")        
        AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong)      
      })      
      .assignAscendingTimestamps(_.timestamp * 1000L)      

    //插入一步过滤操作,并将有刷单行为的用户输出到侧输出流(黑名单报警)。如何设置keyBy呢,人刷广告,那么Key自然是(userid,adid)    
    val filterBlackListUserStream: DataStream[AdClickLog] = adLogStream      
      .keyBy(data => (data.userId, data.adId))      
      .process(new FliterBlackListUserResult(100)) //引入一个KeyedProcessFunction     

    //开窗聚合统计,以province为keyBy, 每5s钟输出前一个小时的总量    
    //1.可以直接使用processWindowFunction,也可以用aggregate预聚合    
    val adCountResultStream = filterBlackListUserStream      
      .keyBy(_.province)      
      .timeWindow( Time.hours(1), Time.seconds(5) )      
      .aggregate(new AdCountAgg(), new AdCountWindowResult())       

    adCountResultStream.print("count result")    
    //测输出流获取黑名单命中数据    
    filterBlackListUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("waring")    
    env.execute("ad count statistics job")    
  }
} 

class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long]{  
  override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1     
  override def createAccumulator(): Long = 0L   
  override def getResult(accumulator: Long): Long = accumulator   
  override def merge(a: Long, b: Long): Long = a + b
} 

class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow]{  
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {    
    val end = new Timestamp(window.getEnd).toString    
    out.collect(AdClickCountByProvince(end, key, input.head))  
  }
}  

//自定义keyedProcessFunction,当此人在这个广告上点击量超过maxcount时即被拉入黑名单,key为(userid,adid),输入是AdClickLog,输出也是AdClickLog
//state有三种,用哪种呢? filterState?因为需要清空,定时器
class FliterBlackListUserResult(maxCount:Long) extends KeyedProcessFunction[(Long,Long),AdClickLog,AdClickLog]{   

  //定义状态,保存用户对广告的点击量,ValueState. 创建一个名称是count的ValueState和每天0点定时清空状态的时间戳resetTimerTsState和标记  
  //当前用户是否进入黑名单  //不加lazy的话开始创建的时候拿不到上下文,要么放到open方法里,要么加lazy懒加载模式,还没弄懂为什么?  
  //加入用到了很多定时器,怎么知道需要零点的时候清空呢?可以把当时设置定时器的时间戳存起来,只要当前时间=保存的时间戳,直接清空操作。如果不是就可能是别的定时器  
  //  
  lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))  
  lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))  
  lazy val isBlackState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black",classOf[Boolean]))  
  override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = {     

    val curCount = countState.value()    
    //判断只要是第一个数据来了,直接注册0点的清空状态定时器. 要清空状态的零点定时器什么时候注册呢?跟当前用户是否黑名单报警及curCount都没关系    
    //只要在今天范围内,只要用户来了都要注册一个定时器,到零点时就清空,不管说用户有没有点击达到黑名单的上线。比如1000是上线,到0点的时候    
    //点击了100下,那么这个0点也要出发清空状态的定时器    
    //curCount ==0时就是第一条数据来了    
    if( curCount == 0 ) {      
      //(ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1):首先拿到当前的时间。比如当前处理的时间戳转换成时间2021-06-28 16:20:25,转换成当前的日期为2021-06-28,0点触发是29号的0点,所以要加1.      
      //然后转换成毫秒* (24 * 60 * 60 * 1000)      
      //因为以上转换是标准的伦敦时间,我们这边是东八区时区,有8个小时时差,所以要减掉8个小时.- 8 * 60 * 60 * 1000      
      val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000      
      resetTimerTsState.update(ts) //保存状态      
      ctx.timerService().registerProcessingTimeTimer(ts)  //注册一个处理时间的定时器    
    }         

    //判断count值是否已经达到定义的阈值,如果超过就输出到黑名单    
    if(curCount >= maxCount){      
      //判断是否已经在黑名单里,没有输出侧输出流        
      if(!isBlackState.value()){        
        isBlackState.update(true)        
        ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId, value.adId, "Click ad over " + maxCount + " times today."))      
      }       
      return     
    }     

    //正常情况下,count加1,原样输出. 这个countStateflink保存在其一个key,state里。 每个key都会有一个ValueState    
    countState.update(curCount + 1)      
    collector.collect(value)  
  }    
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {     
    if (timestamp == resetTimerTsState.value()){        
      isBlackState.clear()      
      countState.clear()    
    }  
  }  
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容