在第一章节系列结构化流的博客文章中,我们展示了怎样用简单的方式用结构化流实现端到端的流式ETL程序,将json日志数据转换成Parquet格式表。该文强调构建从各种复杂格式数据源读入并对数据进行转换的管道所面临的挑战。在本篇博文中,我们将深入的研讨该问题,并展示如何用Spark SQL内置函数解决数据转换中面临的挑战。
确切的说,我们将从以下几个方面进行讨论:
有哪些不同数据格式及怎样权衡
如何简单的用Spark SQL对这些数据进行处理
针对不同的场景,如何选择正确的数据格式
数据源和格式
数据有无数不同的格式,电子表格可用用XML,CSV,TSV表示,应用程序指标可以以原始文本或JSON格式存储。每种场景都有特定的数据格式与此相适应。在大数据世界,我们一般会遇到
像Parquet,ORC,Avro,JSON,CSV,SQL以及NoSQL数据源、文本文件。我们可以将这些数据格式大致分为一下三类:结构化、半结构化和非结构化。让我们尝试了解每个类别的好处和短处。
结构化数据
结构化数据源对数据定义了一种模式。通过这些关于底层数据的额外信息,结构化数据源提供高效的存储和性能。例如,列式数据存储Parquet和ORC,使得从一个列子集中提取数据更加容易。当数据查询只需要获取一少部分列的数据时,通过遍历每行数据的方式需要查询出过多的数据。基于行的存储格式,如Avro通过高效的序列化存储数据提供了存储优势。但是,这种优势是以复杂性为代价的。例如,由于结构不够灵活,模式转换将成为挑战。
非结构化数据
相比之下,非结构化数据源是任意格式的文本或不包含标记或元数据的二进制对象(例如以逗号分隔的CSV文件)来组织数据。新闻文章,医疗记录,图像斑点,应用日志经常被当成是非结构化数据。这些数据源分类一般需要根据数据的上下文才能解析。因此,需要清楚知道某个文件是图片还是新闻,才能正确进行解析。大多数数据源都是非结构化的,要从这些非结构化的数据中获取数据价值,由于其格式本身的笨重,需要经过大量转换和特征提取技术去解释这些数据集,成本较高。
半结构化数据
半结构化数据源是每行记录一个结构,但不需要对整体记录有一个全局的模式定义。因此,每行记录是通过其自身的模式信息对其进行扩充。JSON和XML就是其中最流行的例子。半结构化数据的优势在于通过每行记录自身的描述信息,增强了展示数据信息的灵活性。由于有很多轻量级的解析器用于处理这些记录,因此半结构化数据格式在很多应用中普遍被使用,并且在可读性上存在优势。但是,它的主要缺陷也在于会产生额外的解析开销,不能专门应用于即席查询。
用Spark SQL进行数据交换(读和写)
在之前的博文中,我们讨论了如何将Cloudtrail日志从JSON转为为Parquet,以此将即席查询时间缩短10倍。Spark SQL允许用户从批处理和流式查询中获取这些数据源类的数据。它原生的支持在Parquet,ORC,JSON,CSV和文本格式中读取和写入数据,并且Spark还提供了大量其他数据源连接器的包。你可以通过JDBC DataSource链接SQL数据库。
Apache Spark能够用以下简单的方式来实现数据交换:
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
.load("path/to/data")
output = … # perform your transformations
output.writeStream \ # write out your data
.format("parquet") \
.start("path/to/write")
无论是批处理还是流式数据,我们知道怎样读写不同的数据源格式,但是不同的数据源支持不同的模式和数据类型。传统数据库仅仅支持原始数据类型,而像JSON允许用户在列中嵌套对象,有数组类型和键值对类型。用户经常需要擦除这些数据类型,以此达到高效存储和正确表示他们的数据。幸运的是,Spark SQL在处理原始类型和复杂数据类型方面非常容易。让我们看下如何从负责的数据烈性到原始数据类型,反之亦然。
转换复杂数据类型
在使用半结构化格式时,通常需要复杂数据类型,如结构体、Map,数组。举个例子,你可能需要一个日志接口去请求你的web服务器,这个接口请求包含了字符串键值对类型的HTTP头,另外它还包含json格式的form数据,它们包含嵌套字段或数组。有些 数据源格式可能支持或可能不支持复杂数据类型。而有些格式通过特定的数据类型来存储数据从而提供性能优势,例如,当使用Parquet时,所有结构列都将获得与顶级列相同的处理方式,因此,如果你在一个嵌套字段上进行过滤,将会获得与处理顶级列一样的优势。但是,maps被视为两个数组列,因此你将不会收到有效的过滤语义。
我们来看一些关于Spark SQL如何允许你使用一些数据转换技术来自由的构造你的数据。
从嵌套列中查询
点符号表示访问结构体或map中的嵌套列
// input
{
"a": {
"b": 1
}
}
Python: events.select("a.b")
Scala: events.select("a.b")
SQL: select a.b from events
// output
{
"b": 1
}
整平结构体
*表示结构体中所有的子字段
// input
{
"a": {
"b": 1,
"c": 2
}
}
Python: events.select("a.*")
Scala: events.select("a.*")
SQL: select a.* from events
// output
{
"b": 1,
"c": 2
}
嵌套列
结构函数或SQL能被用于创建新的结构
// input
{
"a": 1,
"b": 2,
"c": 3
}
Python: events.select(struct(col("a").alias("y")).alias("x"))
Scala: events.select(struct('a as 'y) as 'x)
SQL: select named_struct("y", a) as x from events
// output
{
"x": {
"y": 1
}
}
嵌套所有列
*能被用于包含嵌套结构中的所有列
// input
{
"a": 1,
"b": 2
}
Python: events.select(struct("*").alias("x"))
Scala: events.select(struct("*") as 'x)
SQL: select struct(*) as x from events
// output
{
"x": {
"a": 1,
"b": 2
}
}
从数组或map中选择一个
getItem( )或[ ] 可以被用于从数据或map中查询一个元素
// input
{
"a": [1, 2]
}
Python: events.select(col("a").getItem(0).alias("x"))
Scala: events.select('a.getItem(0) as 'x)
SQL: select a[0] as x from events
// output
{ "x": 1 }
// input
{
"a": {
"b": 1
}
}
Python: events.select(col("a").getItem("b").alias("x"))
Scala: events.select('a.getItem("b") as 'x)
SQL: select a['b'] as x from events
// output
{ "x": 1 }
将数组或map中的每一个元素创建一行
explode( )用于创建为数组或map中的元素创建一个新的行,这和HiveQL中的LATERAL VIEW EXPLODE和相似
// input
{
"a": [1, 2]
}
Python: events.select(explode("a").alias("x"))
Scala: events.select(explode('a) as 'x)
SQL: select explode(a) as x from events
// output
[{ "x": 1 }, { "x": 2 }]
// input
{
"a": {
"b": 1,
"c": 2
}
}
Python: events.select(explode("a").alias("x", "y"))
Scala: events.select(explode('a) as Seq("x", "y"))
SQL: select explode(a) as (x, y) from events
// output
[{ "x": "b", "y": 1 }, { "x": "c", "y": 2 }]
将多行数据收集成一个数组
collect_list()和collect_set()能被用于将多个项聚合到一个数组
// input
[{ "x": 1 }, { "x": 2 }]
Python: events.select(collect_list("x").alias("x"))
Scala: events.select(collect_list('x) as 'x)
SQL: select collect_list(x) as x from events
// output
{ "x": [1, 2] }
// input
[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]
Python: events.groupBy("y").agg(collect_list("x").alias("x"))
Scala: events.groupBy("y").agg(collect_list('x) as 'x)
SQL: select y, collect_list(x) as x from events group by y
// output
[{ "y": "a", "x": [1]}, { "y": "b", "x": [2]}]
从数组的每个项查询一个字段
当你用点符号作用于数组上时,将会返回一个新的数组,该数组的字段是从源数据组中选择的每一个元素
// input
{
"a": [
{"b": 1},
{"b": 2}
]
}
Python: events.select("a.b")
Scala: events.select("a.b")
SQL: select a.b from events
// output
{
"b": [1, 2]
}
强大的to_json和from_json
如果你真的想保留列的复杂结构,但是你需要将其编码为字符串吗?Spark SQL提供了像to_json这样的函数用于将结构数据编码成字符串,from_json()用于将结构体恢复成复杂类型。从流式数据源像kafka中读取或写入数据时,用JSON字符串存储列非常有用,每一个kafka的键值对记录将被增加一些元数据,比如将时间戳、位置信息注入到kafka。如果value字段数据的是JSON格式的,你就能用from_json去抽提取你的数据,丰富它,清理它,然后将其再次推向下游kafka或写入文件。
用JSON编码结构体
to_json()能被用于将结构体转换成JSON字符串。当你在写入数据至kafka,需要重新编码多个列的数据成一个列时,这个方法非常有用。此方法目前在SQL中还不能用。
// input{
"a": {
"b": 1
}}
Python: events.select(to_json("a").alias("c"))
Scala: events.select(to_json('a) as 'c)
// output{
"c": "{\"b\":1}"}
解码json列为一个结构体
from_json()方法能被用于转换JSON字符串列数据为结构体,那么你可以按照上述方式展平结构体,使其具有单独的列。该方法目前也不能用于SQL.
// input{
"a": "{\"b\":1}"}
Python:
schema = StructType().add("b", IntegerType())
events.select(from_json("a", schema).alias("c"))Scala:
val schema = new StructType().add("b", IntegerType)
events.select(from_json('a, schema) as 'c)
// output{
"c": {
"b": 1
}}
有些时候你想保留JSON 字符串的一部分仍旧为JSON,以避免模式中的复杂性过高。
// input{
"a": "{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"}
Python:
schema = StructType().add("b", StructType().add("x", IntegerType())
.add("y", StringType()))
events.select(from_json("a", schema).alias("c"))Scala:
val schema = new StructType().add("b", new StructType().add("x", IntegerType)
.add("y", StringType))
events.select(from_json('a, schema) as 'c)
// output{
"c": {
"b": {
"x": 1,
"y": "{\"z\":2}"
}
}}
从包含JSON的列中解析字段集合
json_tuple()能从包含JSON数据的字符串类型列中抽取可用的字段
// input{
"a": "{\"b\":1}"}
Python: events.select(json_tuple("a", "b").alias("c"))Scala: events.select(json_tuple('a, "b") as 'c)
SQL: select json_tuple(a, "b") as c from events
// output{ "c": 1 }
有时候字符串列并不完全符合JSON的描述格式,但是仍旧有良好的结构。例如,Log4j生成的日志消息格式。Spark SQL能轻松的将这些字符串进行结构化。
解析格式良好的字符串列
regexp_extract() 使用正则表达式来解析字符串数据
// input[{ "a": "x: 1" }, { "a": "y: 2" }]
Python: events.select(regexp_extract("a", "([a-z]):", 1).alias("c"))Scala: events.select(regexp_extract('a, "([a-z]):", 1) as 'c)
SQL: select regexp_extract(a, "([a-z]):", 1) as c from events
// output[{ "c": "x" }, { "c": "y" }]
以上有很多种转换,现在我们来看一下现实生活中的一些用例,把所有这些数据格式和数据处理能力用到恰到好处。
利用好所有的这些强大的转换能力
在Databricks,我们收集服务的日志信息,并在客户受到影响之前使用它们执行实时监控以检测问题。日志文件是非结构化的文件,但是由于是以良好的Log4j格式定义的,因此文件是可解析的。我们运行一个日志收集服务,将每行日志和额外的元数据信息以JSON的格式发送至kinesis。JSON格式的记录批量上传至S3作为文件。通过直接查询这些JSON日志去发现问题是非常乏味的:因为对于回答任务和查询,这些文件会包含重复项,即使它涉及单个列,整个JSON记录也可能需要反序列化。
为了解决这个问题,我们执行一个管道去读取这些JSON记录,并对元数据执行数据删除。这样我们就只剩下那些JSON格式或非结构化文本的原始日志信息。如果我们需要处理JSON,可以使用from_json()和上面描述的一些其它转换去格式化我们的数据。如果是文本,我们可以用regexp_extract()去解析Log4j格式成一个更加结构化的形式。一旦完成了我们所有的转换和重组,我们将记录按日期分区保存在Parquet中。这将能从日志中发现的时间提高10至100倍。
不再需要花很大的代价去反序列化JSON记录
不在需要对原始日志信息进行复杂的字符串对比
仅仅需要在查询中抽取两个列:时间和日志级别
下面是很多客户提供的通用用例场景:
“我想用我的数据运行机器学习管道。 我的数据已经被预处理,我将在整个管道中使用我的所有功能”
当你要访问数据的所有行,Avro是一个不错的选择:
“我有一个IoT用例,我的传感器发送给我的事件。 对于每个事件,元数据的重要性是不同的”
有些场景你的模式需要更好的灵活性,也许你可以考虑使用JSON存储你的数据。
“我想在报纸文章或情感分析上对产品评论进行语音识别算法”
如果你的数据没有固定的模式,也不是固定的模式或结构,那么将其存储为纯文本可能更易于使用。有可能你又一个管道,它在分结构化数据上进行特征提取,并将其存储为Avro,为你的机器学习流程做准备。
结论
在这片博文中,我们探讨了Spark SQL如何使用许多来源和格式的数据,并轻松的执行这些数据格式之间的转换和交换。我们分享了Databrics是如何策划数据,并考虑了其它生产用例,因为你可能想做不同事情。
Spark SQL为您提供必要的工具,以便以任何形式访问您的数据,无论其格式如何,都可以为下游应用程序做准备,并且在流式数据上具有低延时,历史数据上具有高吞吐量。