Spark Streaming支持的数据源有两大类,第一大类是基础的数据源
第二大类是高级数据源
下面是我写的样例:
第一步配置flume的agent,并启动flume,监控一个文件夹datatest/,配置文件如下
# name the components on this agent
spooldir-memory-avro-streaming.sources = spooldir-source
spooldir-memory-avro-streaming.sinks = avro-sink
spooldir-memory-avro-streaming.channels = memory-channel
# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
## 通过spooldir来监控文件内容的变化
spooldir-memory-avro-streaming.sources.spooldir-source.type = spooldir
spooldir-memory-avro-streaming.sources.spooldir-source.spoolDir =/usr/local/datatest
spooldir-memory-avro-streaming.sources.spooldir-source.fileHeader = true
spooldir-memory-avro-streaming.sources.spooldir-source.deletePolicy=immediate
spooldir-memory-avro-streaming.sources.spooldir-source.ignorePattern=^(.)*\\.out$
# Describe the sink
spooldir-memory-avro-streaming.sinks.avro-sink.type = avro
spooldir-memory-avro-streaming.sinks.avro-sink.hostname=10.101.3.3
spooldir-memory-avro-streaming.sinks.avro-sink.port=44445
spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel
# Use a channel which buffers events in memory
##使用内存的方式
spooldir-memory-avro-streaming.channels.memory-channel.type = memory
# Bind the source and sink to the channel
spooldir-memory-avro-streaming.sources.spooldir-source.channels = memory-channel
spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel
第二步编写streaming与flume整合的java代码
第三步提交jar到spark上,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &
bin/spark-submit --class com.liushun.Flume2StreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar
第四步 启动flume由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &
./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streaming.conf --name spooldir-memory-avro-streaming -Dflume.root.logger=INFO,console
注意:一定不能先启动flume,因为flume在启动时会预先测试监听端口是否存在,如果存在才继续启动,否则就会中断启动,报错。
第五步 验证
创建文件移动到usr/local/datatest目录下,观察streaming打印的日志是否跟我们预想的一样。
在运行程序时你可能会遇到ClassNotFound的错误,不用紧张,只需要根据错误,去你的maven仓库种找到相应的jar包添加到spark的jars目录中即可。
截止目前位置:flume主动推送到Streaming的方式介绍完毕,接下来我们介绍另外一种。
那么我们要使用这种方式,那么具体步骤是什么尼?看下面
第一步:配置flume
(1)添加jar包
(2)编写运行的自定义的flume的agent配置文件
第二步编写Spark Streaming应用程序
以下是我的整合样例,仅供参考
第一步、配置flume的agent的自定义Sink,并启动flume,监控一个文件夹datatest/,配置文件如下
# name the components on this agent
spooldir-memory-avro-streamingfromSink.sources = spooldir-source
spooldir-memory-avro-streamingfromSink.sinks = spark-sink
spooldir-memory-avro-streamingfromSink.channels = memory-channel
# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
## 通过spooldir来监控文件内容的变化
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.type = spooldir
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.spoolDir =/usr/local/datatest
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.fileHeader = true
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.deletePolicy=immediate
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.ignorePattern=^(.)*\\.out$
# Describe the sink
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.hostname=10.101.3.3
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.port=44445
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel
# Use a channel which buffers events in memory
##使用内存的方式
spooldir-memory-avro-streamingfromSink.channels.memory-channel.type = memory
# Bind the source and sink to the channel
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.channels = memory-channel
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel
应用程序编写样例:
第三步启动flume的,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &
./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streamingfromSink.conf --name spooldir-memory-avro-streamingfromSink -Dflume.root.logger=INFO,console
第四步启动应用程序,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &
./spark-submit --class com.liushun.StreamingFromFlumeWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar
验证结果: