本书用到的代码:https://github.com/databricks/Spark-The-Definitive-Guide
第二章 Spark浅析
Spark应用程序
驱动器:维护Spark应用程序相关信息、回应用户程序、分析并分发任务给执行器
执行器:执行分发来的代码、将执行器的计算状态报告给运行驱动器的节点
几个基本概念
SparkSession
在spark解压缩文件处打开shell,输入,进入交互式操作界面
./bin/spark-shell
可以通过SparkSession的驱动器来控制Spark应用程序
val myrange = spark.range(1000).toDF("number")
DataFrame
包含行和列的数据表,可以跨越上千台设备存储
数据分区
分区是位于集群中的一台物理机上的多行数据的集合,通常一个执行器处理一个分区的数据
转换操作
要更改DataFrame需要告诉Spark如何修改它以执行想要的操作,这个过程叫做转换
val divisBy2 = myRange.where("number%2=0")
在调用动作操作之前,spark不进行转换
窄依赖:每个输入分区决定一个输出分区的转换
宽依赖:每个分区决定了多个输出分区
惰性评估
等到绝对需要时才执行计算,用户进行数据操作时,不立刻修改数据,而是建立一个作用到原始数据的转换计划
动作操作
用于触发计算,一个动作指示Spark在一系列转换操作后计算一个结果,比如
divisBy2.count()
Spark用户接口
本地是在localhost:4040
一个例子
spark.sql生成的是一个新的DataFrame
执行的例子是
import org.apache.spark.sql.functions.desc
flightData2015
.groupBy("DEST_COUNTRY_NAMWE")
.sum("counmt")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()
这个执行计划是一个有向无环图,每个转换生成一个新的不可变的DataFrame,可以在DataFrame上调用一个动作产生一个结果。
第三章 Spark工具集
Dataset
类型安全的结构化API
用于在Java和Scala中编写静态类型的代码,让用户可以通过API使用Java/Scala定义DataFrame中的每条记录
结构化流处理样例
本章做了简单介绍,有一个实例
import org.apache.spark.sql.functions.{window, column, desc, col}
// 生成一个dataframe
staticDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.show(5)
// 生成一个流式数据
spark.conf.set("spark.sql.shuffle.partitions", "5")
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1) // 1次读入的文件数量
.format("csv")
.option("header", "true")
.load("/data/retail-data/by-day/*.csv")
// 对流式数据进行操作并落内存
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
$"CustomerId", window($"InvoiceDate", "1 day"))
.sum("total_cost")
purchaseByCustomerPerHour.writeStream
.format("memory") // memory = store in-memory table
.queryName("customer_purchases") // the name of the in-memory table
.outputMode("complete") // complete = all the counts should be in the table
.start()
// 运用流式数据做一个简单查询
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")
.show(5)
低级API
主要用于支持通过弹性分布式数据集对任意的Java Python对象进行操作
RDD的面试题 https://blog.csdn.net/weixin_41919236/article/details/84780252