- sparklyr不仅提供了基于Spark的分布式机器学习算法库,还有其他的一些功能。如下:
- 使用dplry和SQL(通过DBI)交互式的操作Spark的数据。
- 过滤和聚合Spark数据集,然后将它们通过R进行分析和可视化。
- 使用Spark MLlib和H2O SparkingWater实现分布式的机器学习。
- 创建extensions,可以调用完整的SparkAPI并提供Spark包的接口。
- 支持集成连接到Spark,并通过RStudioIDE浏览Spark DataFrames。
从CRAN安装sparklyr
install.packages("sparklyr")
- 还要安装一个本地的Spark版本
- 如果使用RStudio IDE,还需要下载一个最新的IDE,这个新的IDE包含了集成Spark的功能提升。
library(sparklyr)
spark_install(version = "1.6.2")
连接到Spark
- 可以选择连接本地的Spark实例或者远程的Spark集群,如下连接到本地的Spark。
- 返回的Spark connection(sc)为Spark集群提供了一个远程的dplyr数据源。
library(sparklyr)
sc <- spark_connect(master = "local")
数据读取
- 可以使用dplyr的copy_to函数将R的data frames拷贝到Spark。
- 更典型的是可以通过spark_read的一系列函数读取Spark集群中的数据。
- 如下例子,从R拷贝一些数据集到Spark。
- 注意可能需要安装nycflights13和Lahman包才能运行这些代码。
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
dplyr使用
- 针对集群中的表,现在可以使用所有可用的dplyr的verbs。以下是一个简单的过滤示例:
# filter by departure delay
flights_tbl %>% filter(dep_delay == 2)
- 比如,分析航班延误的数据。
delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect()
# plot delays
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
- 注意尽管上面显示的dplyr函数与在使用R的data frames时是一样的,但如果使用的是sparklyr,它们其实是被推到远端的Spark集群里执行的。
Window Functions
- dplyr同时也支持window函数,比如:
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
batting_tbl %>%
select(playerID, yearID, teamID, G, AB:H) %>%
arrange(playerID, yearID, teamID) %>%
group_by(playerID) %>%
filter(min_rank(desc(H)) <= 2 & H > 0)
Machine Learning
- 使用Spark MLlib或H2O SparkingWater实现分布式的机器学习。
- 它们都提供了一系列的基于DataFrames构建的high-levelAPIs,从而帮助创建和调试机器学习工作流。
Spark MLlib
- 例子:将使用ml_linear_regression来拟合线性回归模型。
- 使用内置的mtcar数据集,看看是否可以根据其重量(wt)和发动机的气缸数量(cyl)来预测汽车的燃油消耗(mpg)。
- 假设在每种情况下,mpg和features(wt和cyl)之间的关系是线性的。
# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)
# transform our data set, and then partition into 'training', 'test'
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# fit a linear model to the training dataset
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
- 对于由Spark生成的线性回归模型,可以使用summary()来更多的了解拟合质量(quality of our fit),以及每个预测变量的统计显著性(statistical significance)。
summary(fit)
- Spark机器学习支持众多的算法和特征变换,如上所示,会发现将这些功能与dplyr管道链接起来很容易。
H2O Sparkling Water
- 以mtcars为例,这次使用H2O Sparkling Water来实现。
- dplyr代码依旧是用来准备数据,当将数据分为test和training后,调用h2o.glm而不是ml_linear_regression。
# convert to h20_frame (uses the same underlying rdd)
training <- as_h2o_frame(partitions$training)
test <- as_h2o_frame(partitions$test)
# fit a linear model to the training dataset
fit <- h2o.glm(x = c("wt", "cyl"),
y = "mpg",
training_frame = training,
lamda_search = TRUE)
# inspect the model
print(fit)
- 对于由H2O产生的线性回归模型,可以使用print() 或 summary()来更多的了解拟合质量(quality of our fit)。
- summary()方法返回一些关于评分历史(scoringhistory)和变量重要性(variableimportance)的额外信息。
大数据视频推荐:
腾讯课堂
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通