Spark The Definitive Guide(Spark权威指南) 中文版。本书详细介绍了Spark2.x版本的各个模块,目前市面上最好的Spark2.x学习书籍!!!
关注:登峰大数据,阅读中文Spark权威指南(完整版),系统学习Spark大数据框架!
如果您觉得作者翻译的内容有帮助,请分享给更多人。您的分享,是作者翻译的动力
本书前几章已经从用户的角度介绍了结构化流。这自然是应用程序的核心。本章将介绍在开发应用程序之后,在生产环境中稳定运行结构化流所需的一些操作工具。在Apache Spark 2.2.0中,结构化流被标记为可生产的,这意味着该版本具有生产使用所需的所有特性和稳定的API。许多组织已经在生产中使用该系统,因为坦率地说,它与运行其他生产Spark应用程序没有太大的不同。的确,通过诸如事务source/sink和精确的一次处理等特性,结构化流设计人员试图使其尽可能易于操作。本章将带您了解一些特定于结构化流的关键操作任务。这将补充我们在第2部分中看到和了解到的有关Spark操作的所有内容。
23.1.容错和检查点
流应用程序最重要的操作关注点是故障恢复。错误是不可避免的:例如丢失集群中的一台机器,模式将在没有适当迁移的情况下意外更改,甚至可能故意重启集群或应用程序。在这些情况下,结构化流允许您通过重新启动应用程序来恢复应用程序。为此,必须将应用程序配置为使用检查点和WAL预写日志,这两个操作都由引擎自动处理。具体地说,您必须配置一个查询来写入可靠文件系统(例如,HDFS、S3或任何兼容的文件系统)上的检查点位置。然后,结构化流将定期将所有相关的进度信息(例如,给定触发器中处理的偏移量范围)以及当前中间状态值保存到检查点位置。在失败场景中,您只需重新启动应用程序,确保指向相同的检查点位置,它就会自动恢复其状态,并从停止的地方开始处理数据。您不必代表应用程序手动管理此状态----结构化流媒体为您做这些功能。
要使用检查点,请在通过writeStream上的checkpointLocation选项启动应用程序之前指定检查点位置。你可以这样做:
// in Scalaval static = spark.read.json("/data/activity-data")val streaming = spark .readStream .schema(static.schema) .option("maxFilesPerTrigger", 10) .json("/data/activity-data") .groupBy("gt") .count()val query = streaming .writeStream .outputMode("complete") .option("checkpointLocation", "/some/location/") .queryName("test_stream") .format("memory") .start()# in Pythonstatic = spark.read.json("/data/activity-data")streaming = spark\ .readStream\ .schema(static.schema)\ .option("maxFilesPerTrigger", 10)\.json("/data/activity-data")\ .groupBy("gt")\ .count()query = streaming\ .writeStream\ .outputMode("complete")\ .option("checkpointLocation", "/some/python/location/")\ .queryName("test_python_stream")\ .format("memory")\ .start()
如果丢失检查点目录或其中的信息,应用程序将无法从失败中恢复,您将不得不从头开始重新启动流。
23.2.更新程序
为了在生产环境中运行应用程序,启用检查点可能是最重要的。这是因为检查点将存储到目前为止您的流所处理的所有信息,以及它可能存储的中间状态。然而,检查点确实带来了一个小陷阱——当您更新流应用程序时,您将不得不对旧检查点数据进行推理。当您更新您的应用程序时,您必须确保您的更新不是破坏性的更改。当我们查看这两种类型的更新时,我们将详细讨论这些内容:对应用程序代码的更新或运行新的Spark版本。
23.2.1.对应用程序代码的更新
结构化流的设计允许在应用程序重启之间对应用程序代码进行某些类型的更改。最重要的是,允许您更改用户定义函数(udf),只要它们具有相同的类型签名。这个特性对于bug修复非常有用。例如,假设应用程序开始接收一种新类型的数据,并且当前逻辑中的数据解析函数之一崩溃。使用结构化流,您可以使用该函数的新版本重新编译应用程序,并在流中它之前崩溃的同一位置重新编译。
虽然添加新列或更改UDF之类的小调整不会破坏更改,也不需要新的检查点目录,但是较大的更改确实需要一个全新的检查点目录。例如,如果更新流应用程序以添加新的聚合键或从根本上更改查询本身,Spark就不能从旧检查点目录为新查询构造所需的状态。在这些情况下,结构化流将抛出一个异常,说它不能从检查点目录开始,您必须从头开始使用一个新的(空的)目录作为检查点位置。
23.2.2.运行新的Spark版本
结构化的流处理应用程序应该能够从一个旧的检查点目录重新启动到Spark的补丁版本更新(例如,从Spark 2.2.0到2.2.1再到2.2.2)。检查点格式被设计为向前兼容的,所以它可能被破坏的唯一方法是由于关键的错误修复。如果Spark发行版不能从旧检查点恢复,那么它的发行说明中将清楚地记录这一点。结构化流媒体开发人员还希望在较小的版本更新(如Spark 2.2)之间保持格式的兼容性。但是您应该检查发布说明,看看是否支持每次升级。在这两种情况下,如果不能从检查点启动,都需要使用新的检查点目录重新启动应用程序。
23.2.3.调整应用程序的规模大小
通常,集群的大小应该能够轻松地处理高于数据速率的突发事件。下面讨论应用程序和集群中应该监控的关键指标。一般来说,如果您看到您的输入速率远远高于您的处理速率(稍后将进行详细说明),那么就该扩展您的集群或应用程序了。根据您的资源管理器和部署,您可能只是能够动态地将executor添加到应用程序中。当需要时,您可以用同样的方法缩小应用程序的规模——删除执行器(可能通过您的云提供商)或使用更低的资源计数重新启动应用程序。这些更改可能会导致一些处理延迟(当删除执行器时,将重新计算数据或重新分配分区)。最后,是否值得创建一个具有更复杂的资源管理功能的系统是一个业务决策。
虽然对集群或应用程序进行底层基础设施更改有时是必要的,但有时更改可能只需要重新启动应用程序或使用新配置的流。例如在流处理程序运行过程中,更改 spark.sql.shuffle.partitions参数是不会生效的。这需要重新启动实际的流处理程序,而不一定是整个应用程序。较重的更改(如更改任意Spark应用程序配置)可能需要重新启动应用程序。
23.3.度量和监控
流应用程序中的度量和监视与使用第18章中描述的工具的一般Spark应用程序基本相同。不过,结构化流确实添加了一些更具体的内容,以帮助您更好地理解应用程序的状态。您可以使用两个关键api来查询流查询的状态并查看其最近的执行进度。使用这两个api,您可以了解您的流是否按预期运行。
23.3.1.Query Status 查询状态
查询状态是最基本的监控API,因此它是一个很好的起点。它的目的是回答这个问题:“我的流现在正在执行什么处理?” 此信息在startStream返回的查询对象的status字段中报告。例如,您可能有一个简单的计数流,它提供了由以下查询定义的物联网设备的计数(这里我们只是使用了与前一章相同的查询,没有初始化代码):
query.status
要获得给定查询的状态,只需运行命令query.status将返回流的当前状态。这为我们提供了关于流中在那个时间点上发生的事情的详细信息。下面是查询此状态时将返回的示例:
{ "message" : "Getting offsets from ...", "isDataAvailable" : true, "isTriggerActive" : true}
上面的代码片段描述了从结构化流数据源获取偏移量(因此描述获取偏移量的消息)。有多种消息描述流的状态。
注意我们以在Spark shell中调用的方式在这里内联显示了status命令。但是,对于独立应用程序,您可能没有附加shell来在流程中运行任意代码。在这种情况下,您可以通过实现监控服务器来公开它的状态,例如一个小型HTTP服务器,它监听端口并返回查询。获取请求时的状态。或者,您可以使用稍后描述的更丰富的StreamingQueryListener API来侦听更多事件。
23.3.2.Recent Progress当前进展
虽然可以查看查询的当前状态,但是查看查询过程的能力同样重要。progress API允许我们回答诸如“我处理元组的速度是多少?”或者“元组从源到达的速度有多快?” 通过运行query.recentProgress命令,流查询过程还包括关于流内部的输入源和输出接收器的信息。
query.recentProgress
这是我们运行之前的代码后Scala版本的结果;Python版本将是类似的:
Array({ "id" : "d9b5eac5-2b27-4655-8dd3-4be626b1b59b", "runId" : "f8da8bc7-5d0a-4554-880d-d21fe43b983d", "name" : "test_stream", "timestamp" : "2017-08-06T21:11:21.141Z", "numInputRows" : 780119, "processedRowsPerSecond" : 19779.89350912779, "durationMs" : { "addBatch" : 38179, "getBatch" : 235, "getOffset" : 518, "queryPlanning" : 138, "triggerExecution" : 39440, "walCommit" : 312 }, "stateOperators" : [ { "numRowsTotal" : 7, "numRowsUpdated" : 7 } ], "sources" : [ { "description" : "FileStreamSource[/some/stream/source/]", "startOffset" : null, "endOffset" : { "logOffset" : 0 }, "numInputRows" : 780119, "processedRowsPerSecond" : 19779.89350912779 } ], "sink" : { "description" : "MemorySink" }})
正如您从刚才显示的输出中所看到的,这包括关于流状态的许多细节。需要注意的是,这是一个实时快照(根据查询进度的时间)。为了一致地获得关于流状态的输出,您需要反复查询这个API以获得更新后的状态。前面输出中的大多数字段应该是不言自明的。但是,让我们详细回顾一些更重要的字段。
Input rate and processing rate输入速率和处理速率
输入速率指定有多少数据从输入源流向结构化流。处理速率是应用程序分析数据的速度。在理想情况下,输入和处理速率应该同时变化。另一种情况可能是输入速率远远大于处理速率。当这种情况发生时,流就会落后,您需要将集群扩展到更高的级别来处理更大的负载。
Batch duration批次间隔
几乎所有的流系统都利用批处理以任何合理的吞吐量进行操作(有些系统可以选择高延迟来换取较低的吞吐量)。结构化流实现了这两种功能。当它对数据进行操作时,您可能会看到批处理持续时间随着结构化流处理事件数量的变化而振荡。当然,当连续处理引擎成为执行选项时,这个度量将几乎没有相关性。
提示通常,将批处理持续时间、输入和处理速率的变化可视化是最佳实践。它比简单地报告随时间的变化更有帮助。
23.3.3.Spark UI
Spark web UI(在第18章中详细介绍)还显示了结构化流应用程序的任务、作业和数据处理指标。在Spark UI上,每个流应用程序都将显示为一系列短作业,每个触发器对应一个短作业。但是,您可以使用相同的UI查看来自应用程序的度量、查询计划、任务持续时间和日志。与DStream API不同的一点是,流选项卡不用于结构化流。
23.4.预警
理解和查看结构化流查询的指标是重要的第一步。然而,这需要不断地监控仪表板或度量,以发现潜在的问题。您将需要健壮的自动警报,以便在不手动监控作业的情况下,在作业失败或无法跟上输入数据速率时通知您。有几种方法可以将现有的警报工具集成到Spark中,通常是基于我们前面介绍的recent progress API。例如,您可以直接将度量数据提供给监控系统,比如开源Coda Hale度量库或Prometheus,或者您可以简单地对它们进行日志记录,并使用Splunk这样的日志聚合系统。除了监控和警告查询之外,还需要监控和警告集群和整个应用程序的状态(如果同时运行多个查询)。
23.5.使用流监听器进行高级监控
我们已经讨论了结构化流中的一些高级监控工具。使用一些粘合逻辑,您可以使用status和queryProgress api将监控事件输出到您选择的监控平台(例如,日志聚合系统或Prometheus仪表板) 除了这些方法之外,还有一种更底层但更强大的方法来观察应用程序的执行:StreamingQueryListener类。
StreamingQueryListener类允许您从流查询接收异步更新,以便自动将此信息输出到其他系统,并实现健壮的监控和警报机制。首先开发自己的对象来扩展StreamingQueryListener,然后将其附加到正在运行的SparkSession。一旦您使用sparkssession .streams. addlistener()附加自定义侦听器,当查询启动或停止时,或者在活动查询上取得进展时,您的类将收到通知。下面是结构化流文档中侦听器的一个简单示例:
val spark: SparkSession = ...spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { println("Query started: " + queryStarted.id) } override def onQueryTerminated( queryTerminated: QueryTerminatedEvent): Unit = { println("Query terminated: " + queryTerminated.id) } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { println("Query made progress: " + queryProgress.progress) }})
流监听器允许您使用自定义代码处理每个进度更新或状态更改,并将其传递给外部系统。例如,StreamingQueryListener的以下代码将把所有查询进度信息转发给Kafka。从Kafka读取数据后,您必须解析这个JSON字符串,以便访问实际的指标:
classKafkaMetrics(servers: String) extends StreamingQueryListener {valkafkaProperties = new Properties() kafkaProperties.put("bootstrap.servers", servers) kafkaProperties.put("key.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer") kafkaProperties.put("value.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer")valproducer = new KafkaProducer[String, String](kafkaProperties)importorg.apache.spark.sql.streaming.StreamingQueryListenerimportorg.apache.kafka.clients.producer.KafkaProduceroverridedef onQueryProgress(event:StreamingQueryListener.QueryProgressEvent):Unit= {producer.send(new ProducerRecord("streaming-metrics", event.progress.json)) }overridedef onQueryStarted(event:StreamingQueryListener.QueryStartedEvent):Unit= {}overridedef onQueryTerminated(event:StreamingQueryListener.QueryTerminatedEvent):Unit= {}}
使用StreamingQueryListener接口,您甚至可以通过在同一个(或另一个)集群上运行结构化流应用程序来监控一个集群上的结构化流应用程序。您还可以用这种方式管理多个流。
23.6.结束语
在本章中,我们讨论了在生产环境中运行结构化流所需的主要工具:容错检查点和各种监控api,这些api允许您观察应用程序如何运行。幸运的是,如果您已经在生产环境中运行Spark,那么许多概念和工具都是类似的,因此您应该能够重用大量现有知识。请务必检查第4部分,以查看监控Spark应用程序的其他一些有用工具。