Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。
Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展(Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集.
其实就是一个定时去kafka中消费数据的定时器,只不过数据是保存在rdd中而已。
object UserClickCountAnalytics {
def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}
// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val dbIndex = 1
val clickHashKey = "app::users::click"
--------------------------------------------------------------------------
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})
//也可以这样建立stream对象
val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val numDStreams=5
val topics=Map("zerg.hydra"->1)
val kafkaDStreams=(1to numDStreams).map{_=>KafkaUtils.createStream(ssc,kafkaParams,topics,...)}
//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream=ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it
//> single DStream
val processingParallelism=20
val processingDStream=unionDStream(processingParallelism)
//> single DStream but now with 20 partitions
---------------------------------------------------------------------
// Compute user click times
/**
userClicks.foreachRDD拿到的是微批处理一个批次数据
rdd.foreachPartition拿到的是一个批次在Spark各节点对应的分区数据 partitionOfRecords.foreach拿到对应分区的每条数据 */
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
//通过foreachRDD来保存处理结果
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}