Apache Kafka -8 与Spark集成

Apache Kafka教程 之 与Spark集成

http://blogxinxiucan.sh1.newtouch.com/

原文地址: http://blogxinxiucan.sh1.newtouch.com/2017/07/13/Apache-Kafka-与Spark集成/

Apache Kafka - 与Spark集成

关于Spark

Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。数据可以从诸如Kafka,Flume,Twitter等许多来源中获取,并且可以使用诸如地图,缩小,连接和窗口之类的高级功能的复杂算法进行处理。最后,处理后的数据可以推送到文件系统,数据库和现场仪表盘上。弹性分布式数据集(RDD)是Spark的基础数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集分为逻辑分区,可以在集群的不同节点上进行计算。

与Spark集成

Kafka是Spark流媒体的潜在消息传递和集成平台。卡夫卡作为实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。处理数据后,Spark Streaming可能会将结果发布到HDFS,数据库或仪表板中的另一个Kafka主题或存储。下图描绘了概念流程。

与Spark集成

现在,让我们详细介绍一下Kafka-Spark API。

SparkConf API

它代表Spark应用程序的配置。用于将各种Spark参数设置为键值对。

SparkConf类有以下方法 -

  • set(string key,string value) - 设置配置变量。
  • remove(string key) - 从配置中删除密钥。
  • setAppName(string name) - 为应用程序设置应用程序名称。
  • get(string key) - 获取密钥

StreamingContext API

这是Spark功能的主要入口点。SparkContext表示与Spark集群的连接,可用于在集群上创建RDD,累加器和广播变量。签名的定义如下图所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - 要连接的群集URL(e.g. mesos://host:port, spark://host:port, local[4])。
  • appName - 您的工作的名称,以在集群Web UI上显示
  • batchDuration - 流数据将分批的时间间隔
public StreamingContext(SparkConf conf, Duration batchDuration)

通过提供新的SparkContext所需的配置来创建StreamingContext。

  • conf - Spark参数
  • batchDuration - 流数据将分批的时间间隔

KafkaUtils API

KafkaUtils API用于将Kafka集群连接到Spark流。此API具有如下定义的重要方法createStream签名。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上面显示的方法用于创建从卡夫卡经纪公司提取消息的输入流。

  • ssc - StreamingContext对象。
  • zkQuorum - Zookeeper法定人数。
  • groupId - 此消费者的组ID。
  • topics- 返回主题地图消费。
  • storageLevel - 用于存储接收到的对象的存储级别。

KafkaUtils API有另一种方法createDirectStream,用于创建一个直接从卡夫卡经纪公司提取消息而不使用任何接收器的输入流。该流可以保证来自Kafka的每条消息都被包含在转换中一次。

示例应用程序在Scala中完成。要编译应用程序,请下载并安装sbt,scala构建工具(类似于maven)。主要应用代码如下。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

构建脚本

spark-kafka集成取决于spark,spark流和spark卡夫卡集成罐。创建一个新的文件build.sbt并指定应用程序细节及其依赖关系。该SBT在编译和打包应用程序会下载必要的罐子。

  • name := "Spark Kafka Project" version := "1.0" scalaVersion :=
    "2.10.5"

    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
    libraryDependencies += "org.apache.spark" %% "spark-streaming" %
    "1.6.0" libraryDependencies += "org.apache.spark" %%
    "spark-streaming-kafka" % "1.6.0"

汇编/包装

运行以下命令来编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台来运行应用程序。

sbt package

提交到Spark
启动Kafka Producer CLI(在上一章中介绍),创建一个名为my-first-topic的新主题,并提供一些示例消息,如下所示。

Another spark test message

运行以下命令将应用程序提交到spark控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

此应用程序的示例输出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容