Spark MLlib机器学习开发指南(3)--Pipelines
在这个章节,我们介绍管道Pipelines的概念。ML Pipelines提供了一套构建在DataFrame之上的统一的高级API,帮助用户创建和调试实际的机器学习管道。
目录
- 管道(Pipeline)主要概念
- DataFrame
- Pipeline组件
- 转换器(Transformers)
- 估计器
- 管道(Pipeline)组件属性
- 管道(Pipeline)
- 怎样工作
- 细节
- 参数
- 保存和加载Pipelines
- 代码示例
- 估计器,转换器和参数示例
- Pipeline示例
- 模型选择(超参调试)
管道主要概念
MLlib标准的机器学习算法API,使得更容易将多个算法组合成单个管道或工作流。
本节覆盖了Pipelines API关键概念介绍,其中管道(Pipelines)概念主要受到scikit-learn项目的启发
- DataFrame: 该ML API使用Spark SQL的DataFrame作为ML数据集,可以容纳各种数据类型。例如,DataFrame可以具有存储文本,特征向量,真实标签和预测值
- Transformer: 转换器是一种可以将一个DataFrame转换为另一个DataFrame的算法。
例如,ML模型是一个Transformer,它将具有特征的DataFrame转换成具有预测值的DataFrame。 - Estimator: 估计器是一种可以用DataFrame配合来生成Transformer的算法。
例如,学习算法是在DataFrame上训练并产生模型的估计器。 - Pipeline: 管道将多个转换器和估计器连在一起,被指定为一个ML工作流程。
- Parameter: 所有转换器和估计器现在共享用于指定参数的通用API。
DataFrame
机器学习可以应用于各种各样的数据类型,例如向量,文本,图像和结构化数据。
该API采用Spark SQL的DataFrame来支持各种数据类型。
DataFrame支持许多基本和结构化类型;请参阅Spark SQL数据类型参考,以获取受支持类型的列表。除了Spark SQL指南中列出的类型之外,DataFrame还可以使用ML Vector类型。
可以从常规的RDD隐式或显式创建DataFrame。有关示例,请参阅下面的代码示例和Spark SQL编程指南。
DataFrame中的列可以被命名。下面的代码示例使用如“text,” “features,” 和“label.”这样的名称。
Pipeline 组件
Transformers (转换器)
转换器是一种包含特征转换和学习模型的抽象。技术上,转换器实现了一个transform()方法,它通过追加一个或多个列来将一个DataFrame转换成另一个。例如:
- 一个特征变换器可能需要一个DataFrame,读取一列(例如文本),将其映射到一个新的列(例如特征向量)中,并输出一个追加了映射列的新DataFrame
- 学习模型可能需要一个DataFrame,读取包含特征向量的列,预测每个特征向量的标签,并输出一个新的DataFrame,并追加预测的标签作为新的列。
Estimators (估计器)
一个估计器抽象了学习算法的概念,或者fit(拟合)或训练数据的任何算法。
技术实现上,一个估计器实现一个方法fit(),它接受一个DataFrame并产生一个Model,Model它是一个Transformer。例如,诸如Logistic回归的学习算法是一个估计器,并调用fit()训练一个LogisticRegressionModel,它是一个Model,模型是一个Transformer。
管道组件的属性
转换器(Transformer)的transform和估计器的fit()方法都是无状态的。在将来,可能通过新的概念支持有状态的算法
转换器(Transformer)或者估计器的每个实例都有唯一id,这对指定参数非常有用(下面将会讨论到)
管道(Pipeline)
在机器学习中,通常通过一系列算法来处理和学习数据。例如,一个简单的文本处理工作流可能包含以下几个阶段
- 切分每个文档的文本到词
- 将每个文档的词转成数值化特征向量
- 使用特征向量和标签学习一个预测模型
MLlib表示为像管道一样的工作流,该管道由要按特定顺序运行的一系列流水线阶段(转换器和估计器)组成。
我们将使用这个简单的工作流作为本节中的运行示例。
怎样工作
一个管道包含一系列阶段,每个阶段包含转换器或者估计器。这些阶段按顺序运行,输入的DataFrame在通过每个阶段时被转换。在转换器阶段,transform()方法在DataFrame上被调用。在估计器阶段,fit()方法被调用产生一个估计器(它成为PipelineModel的一部分,或者拟合Pipeline),并且在DataFrame上面调用估计器的transform方法。
下面演示简单的文本文档工作流,下图为训练时使用管道
上图中,顶行代表一个具有三个阶段的管道。
前两个(Tokenizer和HashingTF)是转换器(蓝色),第三个(Logistic回归)是一个估计器(红色)。
底行表示流经管道的数据,其中柱面表示DataFrames。
在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。
Tokenizer.transform()方法将原始文本文档分割成单词,向DataFrame添加带有单词的新列。
HashingTF.transform()方法将单词列转换为特征向量,将这些向量的新列添加到DataFrame。
现在,由于LogisticRegression是一个估计器,所以Pipeline首先调用LogisticRegression.fit()来生成LogisticRegressionModel。
如果流水线有更多的估计器,那么在将DataFrame传递到下一个阶段之前,它会在DataFrame上调用LogisticRegressionModel的transform()方法。
管道是一个估计器。
因此,在Pipeline的fit()方法运行之后,它会生成一个PipelineModel,它是一个Transformer。
此管道模型在测试时使用;
下图说明了这种用法。
在上图中,PipelineModel具有与原始管道相同的步骤,但原始流水线中的所有估计器都已成为转换器。
当在测试数据集上调用PipelineModel的transform()方法时,数据按顺序通过拟合的管道传递。
每个阶段的transform()方法更新数据集并将其传递到下一个阶段。
管道和管道模型有助于确保训练和测试数据通过相同的功能处理步骤。
细节
DAG管道:管道的阶段被指定为有序数组。这里给出的示例全部用于线性管道,即其中每个阶段使用由前一阶段产生管道的数据。
只要数据流图形成有向无环图(DAG),就可以创建非线性流水线。DAG是基于每个阶段的输入和输出列名(通常指定为参数)隐含地指定的。如果管道形成DAG,则必须以拓扑顺序指定阶段。
运行时检查:由于管道可以对具有不同类型的DataFrames进行操作,因此不能使用编译时类型检查。
管道和管道模型是在运行时检查,而不是在实际运行管道之前进行检查。类型检查是使用DataFrame schema完成的,这是DataFrame中列的数据类型的描述。
唯一管道阶段:管道的阶段应该是唯一的实例。例如,同一个实例myHashingTF不应该被插入管道两次,因为流水线阶段必须有唯一的ID。然而,不同的实例myHashingTF1和myHashingTF2(HashingTF类型)都可以放入同一个管道中,因为使用不同的ID创建不同的实例
参数
MLlib的估计器和转换器使用统一的API来指定参数
Param是具有独立文档的命名参数,ParamMap是一组(参数,值)对。
这里有2种给算法指定参数的方法:
1.设置一个实例的参数。例如,如果lr是LogisticRegression的一个实例,可以调用lr.setMaxIter(10)使lr.fit()最多使用10次迭代。该API类似于spark.mllib软件包中使用的API。
2.将ParamMap传递给fit()或transform()方法。任何在ParamMap中的指定参数将覆盖先前通过setter方法指定的参数。
参数属于估计器和转换器的特定实例。例如,如果我们有两个LogisticRegression实例lr1和lr2,那么我们可以使用指定的maxIter参数构建ParamMap:ParamMap(lr1.maxIter - > 10,lr2.maxIter - > 20)。如果在管道中存在两个算法,指定不同的maxIter参数的这将非常有用。
保存和加载Pipelines
通常情况下,将模型或管道保存到磁盘以备以后使用是值得的。在Spark 1.6中,模型导入/导出功能已添加到Pipeline API中。
大多数基本ML模型的转换器都得到很好的支持。请参阅具体的算法的API文档,以查看保存和加载功能是否被支持。
Code examples
本节将给出说明上述功能的示例代码。有关更多信息,请参阅API文档(Scala,Java和Python)。
示例:估计器,转换器和参数
有关API的详细信息,请参阅估计器Scala文档,转换器Scala文档和Params Scala文档。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
完整代码在Spark仓库代码的examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala
示例:管道(Pipeline)
该示例遵循上图中所示的简单文本文档处理管道。
有关API的详细信息,请参阅管道Scala文档。
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
详细代码位于Spark仓库的examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala
模型选择(超参调试)
使用ML管道的一大优点是超参数调试。有关自动模型选择的更多信息,请参阅ML调试指南。