开启另外一个线程每5秒监听HDFS上一个文件是否存在。如果检测到存在,调用ssc.stop()方法关闭SparkStreaming任务(当你要关闭任务时,可以创建你自定义监控的文件目录)
object SparkStreaming12_Stop {
def main(args: Array[String]): Unit = {
// 使用DStream的窗口操作
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming12_Stop")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
// 将采集的数据进行扁平化操作
val wordDStream: DStream[String] = lineDStream.flatMap(line=>line.split( " " ))
// 将扁平化数据进行结构的转变:(word, one)
val wordToOneDStream: DStream[(String, Long)] = wordDStream.map {
word => (word, 1L)
}
// 将转变解构后的数据进行聚合统计
val wordToCountDStream: DStream[(String, Long)] = wordToOneDStream.reduceByKey(_+_)
wordToCountDStream.foreachRDD(rdd=>rdd.foreach(println))
// 打印结果
wordToCountDStream.print()
// 启动新的线程,希望在特殊的场合关闭SparkStreaming
new Thread(new Runnable {
override def run(): Unit = {
while ( true ) {
try {
Thread.sleep(5000)
} catch {
case ex : Exception => println(ex)
}
// 监控HDFS文件的变化
val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"), new Configuration(), "root")
val state: StreamingContextState = ssc.getState()
// 如果环境对象处于活动状态,可以进行关闭操作
if ( state == StreamingContextState.ACTIVE ) {
// 判断路径是否存在
val flg: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark4"))
if ( flg ) {
// 关闭采集器和Driver:优雅的关闭
ssc.stop(true, true)
System.exit(0)
}
}
}
}
}).start()
// 启动采集器
ssc.start()
// Driver程序等待采集器的执行完毕
ssc.awaitTermination()
}
}