本文数据集源自Kaggle比赛,由StumbleUpon提供。比赛的问题涉及网页中推荐的页面是短暂(短暂存在,很快就不流行了)还是长久(长时间流行)。
查看上面的数据集页面中的简介得知可用的字段。开始四列分别包含URL、页面的ID、
原始的文本内容和分配给页面的类别。接下来22列包含各种各样的数值或者类属特征。最后一列
为目标值, -1为长久, 0为短暂。
-
从分类数据集中抽取特征
#首先从http://www.kaggle.com/c/stumbleupon/data下载训练数据,去除第一行描述信息 sed 1d train.tsv > train_noheader.tsv #上传至hdfs [hadoop@master spark]$ hdfs dfs -put train_noheader.tsv ML/ #启动spark-shell [hadoop@master spark]$ spark-shell --master yarn --driver-memory 4G #加载训练数据集 scala> val rawData = sc.textFile("ML/train_noheader.tsv") scala> val records = rawData.map(line => line.split("\t")) scala> records.first() res3: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ... #对原始数据进行清理工作 scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.linalg.Vectors val data = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) #提取最后一列的标记变量以及第5列到第25列的特征矩阵 val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) LabeledPoint(label, Vectors.dense(features)) } data.cache val numData = data.count #朴素贝叶斯模型要求特征值非负 val nbData = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d) LabeledPoint(label, Vectors.dense(features)) }
-
训练分类模型
- 首先,需要引入必要的类并对每个模型配置一些基本的输入参数。其中,需要为逻辑回归和SVM设置迭代次数,为决策树设置最大树深度
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.configuration.Algo import org.apache.spark.mllib.tree.impurity.Entropy val numIterations = 10 val maxTreeDepth = 5
- 首先训练逻辑回归模型
scala> val lrModel = LogisticRegressionWithSGD.train(data, numIterations) warning: there was one deprecation warning; re-run with -deprecation for details lrModel: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5
- 接下来,训练SVM模型:
scala> val svmModel = SVMWithSGD.train(data, numIterations) svmModel: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0
- 接下来训练朴素贝叶斯,记住要使用处理过的没有负特征值的数据:
scala> val nbModel = NaiveBayes.train(nbData) nbModel: org.apache.spark.mllib.classification.NaiveBayesModel = org.apache.spark.mllib.classification.NaiveBayesModel@20ba16b5
- 最后训练决策树:
scala> val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth) dtModel: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 5 with 61 nodes
-
使用分类模型
scala> val dataPoint = data.first dataPoint: org.apache.spark.mllib.regression.LabeledPoint = (0.0,[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]) scala> val prediction = lrModel.predict(dataPoint.features) 17/07/07 16:40:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 17/07/07 16:40:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS prediction: Double = 1.0 scala> val trueLabel = dataPoint.label trueLabel: Double = 0.0 scala> val predictions = lrModel.predict(data.map(lp => lp.features)) predictions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[88] at mapPartitions at GeneralizedLinearAlgorithm.scala:70 scala> predictions.take(5) res7: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
-
预测的正确率和错误率
val lrTotalCorrect = data.map { point => if (lrModel.predict(point.features) == point.label) 1 else 0 }.sum scala> val lrAccuracy = lrTotalCorrect / data.count lrAccuracy: Double = 0.5146720757268425 val svmTotalCorrect = data.map { point => if (svmModel.predict(point.features) == point.label) 1 else 0 }.sum #模型仅仅预测对了一半的训练数据,和随机猜测差不多。 scala> val svmAccuracy = svmTotalCorrect / data.count svmAccuracy: Double = 0.5146720757268425 val nbTotalCorrect = nbData.map { point => if (nbModel.predict(point.features) == point.label) 1 else 0 }.sum scala> val nbAccuracy = nbTotalCorrect / data.count nbAccuracy: Double = 0.5803921568627451 #注意,决策树的预测阈值需要明确给出 val dtTotalCorrect = data.map { point => val score = dtModel.predict(point.features) val predicted = if (score > 0.5) 1 else 0 if (predicted == point.label) 1 else 0 }.sum scala> val dtAccuracy = dtTotalCorrect / data.count dtAccuracy: Double = 0.6482758620689655
-
准确率和召回率
在信息检索中,准确率通常用于评价结果的质量,而召回率用来评价结果的完整性。import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics val metrics = Seq(lrModel, svmModel).map { model => val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) #返回值是一个三元组 (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } metrics: Seq[(String, Double, Double)] = List((LogisticRegressionModel,0.7567586293858841,0.5014181143280931), (SVMModel,0.7567586293858841,0.5014181143280931)) val nbMetrics = Seq(nbModel).map{ model => val scoreAndLabels = nbData.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } nbMetrics: Seq[(String, Double, Double)] = List((NaiveBayesModel,0.6808510815151734,0.5835585110136261)) val dtMetrics = Seq(dtModel).map{ model => val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } val allMetrics = metrics ++ nbMetrics ++ dtMetrics allMetrics.foreach{ case (m, pr, roc) => println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%") } LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418% SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418% NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559% DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%
-
特征标准化
#将特征向量用RowMatrix类表示成MLlib中的分布矩阵。RowMatrix是一个由向量组成的RDD,其中每个向量是分布矩阵的一行。 #RowMatrix类中有一些方便操作矩阵的方法,其中一个方法可以计算矩阵每列的统计特性: import org.apache.spark.mllib.linalg.distributed.RowMatrix val vectors = data.map(lp => lp.features) val matrix = new RowMatrix(vectors) #computeColumnSummaryStatistics方法计算特征矩阵每列的不同统计数据,包括均值 #和方差,所有统计值按每列一项的方式存储在一个Vector中 val matrixSummary = matrix.computeColumnSummaryStatistics() scala> println(matrixSummary.mean) [0.4122580529952672,2.761823191986608,0.4682304732861389,0.21407992638350232,0.09206236071899916,0.04926216043908053,2.255103452212041,-0.10375042752143335,0.0,0.05642274498417851,0.02123056118999324,0.23377817665490194,0.2757090373659236,0.615551048005409,0.6603110209601082,30.07707910750513,0.03975659229208925,5716.598242055447,178.75456389452353,4.960649087221096,0.17286405047031742,0.10122079189276552] scala> println(matrixSummary.min) [0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0] scala> println(matrixSummary.variance) [0.10974244167559001,74.30082476809639,0.04126316989120241,0.02153343633200108,0.009211817450882448,0.005274933469767946,32.53918714591821,0.09396988697611545,0.0,0.0017177410346628928,0.020782634824610638,0.0027548394224293036,3.683788919674426,0.2366799607085986,0.22433071201674218,415.8785589543846,0.03818116876739597,7.877330081138463E7,32208.116247426184,10.45300904576431,0.03359363403832393,0.006277532884214705] #使用StandardScaler对特征向量进行归一化 import org.apache.spark.mllib.feature.StandardScaler val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors) val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features))) scala> println(data.first.features) [0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575] scala> println(scaledData.first.features) [1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
-
使用标准化的数据重新训练模型,这里只训练逻辑回归(因为决策树和朴素贝叶斯
不受特征标准的影响)val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations) val lrTotalCorrectScaled = scaledData.map { point => if (lrModelScaled.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracyScaled = lrTotalCorrectScaled / numData val lrPredictionsVsTrue = scaledData.map { point => (lrModelScaled.predict(point.features), point.label) } val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue) val lrPr = lrMetricsScaled.areaUnderPR val lrRoc = lrMetricsScaled.areaUnderROC println(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%") #从结果可以看出,通过简单对特征标准化,就提高了逻辑回归的准确率,并将AUC从随机50%提升到62%。 Area under ROC: ${lrRoc * 100.0}%2.4f%%") LogisticRegressionModel Accuracy: 62.0419% Area under PR: 72.7254% Area under ROC: 61.9663%