实现:
首先基于topic,创建出kafka的DStream流
val sparkConf = new SparkConf().setAppName(appParams.appName)
val sc = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sc, Seconds(appParams.batchProcInterval))
val kafkaParams = Map[String, String]("metadata.broker.list" -> appParams.brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set[String](appParams.messageInTopic))
创建时间窗:
val windows = messages.map(_._2).window(Seconds(appParams.windownTime), Seconds(appParams.windownTime))
针对每个时间窗做计算
windows.foreachRDD { rdd =>
......
}
每个时间窗内部的处理:
创建case class
case class Record(channelid: String, starttime: Long)
创建sqlContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
将当前消息流转换成DataFrame:
val df = rdd.map(_.split("\\|")).map(line => Record(line(5), line(2).toLong)).toDF()
注册成一张表:
df.registerTempTable("UserPlayChannel")
读取parquet数据,注册成另一张表:
val paraquetFile = sqlContext.read.parquet(filePath)
paraquetFile.registerTempTable("ProgramMetaData")
现在有了两张表,关联查询只需要写好sql语句就可以了,样例:
select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC
代码执行:
val hotProgramList = sqlContext.sql("select b.programid , count(b.programid) as count from UserPlayChannel a join ProgramMetaData b on a.channelid = b.channelid and a.starttime >= b.starttime and a.starttime <= b.endtime group by b.programid order by count DESC")
现在hotProgramList就是关联查询出的结果。