Spark Streaming整合flume

两种整合方式一种方式是推送,另一种方式是拉取

Spark Streaming支持的数据源有两大类,第一大类是基础的数据源

第二大类是高级数据源

Spark Streaming和Flume的整合指南

下面是我写的样例:

第一步配置flume的agent,并启动flume,监控一个文件夹datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streaming.sources = spooldir-source

spooldir-memory-avro-streaming.sinks = avro-sink

spooldir-memory-avro-streaming.channels = memory-channel

# Describe/configure the source

##注意:不能往监控目中重复丢同名文件

## 通过spooldir来监控文件内容的变化

spooldir-memory-avro-streaming.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streaming.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streaming.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streaming.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streaming.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streaming.sinks.avro-sink.type = avro

spooldir-memory-avro-streaming.sinks.avro-sink.hostname=10.101.3.3

spooldir-memory-avro-streaming.sinks.avro-sink.port=44445

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用内存的方式

spooldir-memory-avro-streaming.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streaming.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

第二步编写streaming与flume整合的java代码


第三步提交jar到spark上,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

bin/spark-submit --class com.liushun.Flume2StreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

第四步 启动flume由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streaming.conf --name spooldir-memory-avro-streaming -Dflume.root.logger=INFO,console

注意:一定不能先启动flume,因为flume在启动时会预先测试监听端口是否存在,如果存在才继续启动,否则就会中断启动,报错。

第五步 验证

创建文件移动到usr/local/datatest目录下,观察streaming打印的日志是否跟我们预想的一样。


在运行程序时你可能会遇到ClassNotFound的错误,不用紧张,只需要根据错误,去你的maven仓库种找到相应的jar包添加到spark的jars目录中即可。

截止目前位置:flume主动推送到Streaming的方式介绍完毕,接下来我们介绍另外一种。


这一种方案,相较于第一种推送的方式更加高可用,更加健壮

那么我们要使用这种方式,那么具体步骤是什么尼?看下面


第一步:配置flume

(1)添加jar包

(2)编写运行的自定义的flume的agent配置文件


第二步编写Spark Streaming应用程序

以下是我的整合样例,仅供参考

第一步、配置flume的agent的自定义Sink,并启动flume,监控一个文件夹datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streamingfromSink.sources = spooldir-source

spooldir-memory-avro-streamingfromSink.sinks = spark-sink

spooldir-memory-avro-streamingfromSink.channels = memory-channel

# Describe/configure the source

##注意:不能往监控目中重复丢同名文件

## 通过spooldir来监控文件内容的变化

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.hostname=10.101.3.3

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.port=44445

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用内存的方式

spooldir-memory-avro-streamingfromSink.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

应用程序编写样例:



第三步启动flume的,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streamingfromSink.conf --name spooldir-memory-avro-streamingfromSink -Dflume.root.logger=INFO,console

第四步启动应用程序,由于是样例,我都用会话方式启动,如果想后台启动那就前加 nohup 后加 &

./spark-submit --class com.liushun.StreamingFromFlumeWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

验证结果:


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容