Spark构建分类模型(一)

本文数据集源自Kaggle比赛,由StumbleUpon提供。比赛的问题涉及网页中推荐的页面是短暂(短暂存在,很快就不流行了)还是长久(长时间流行)。
查看上面的数据集页面中的简介得知可用的字段。开始四列分别包含URL、页面的ID、
原始的文本内容和分配给页面的类别。接下来22列包含各种各样的数值或者类属特征。最后一列
为目标值, -1为长久, 0为短暂。

  1. 从分类数据集中抽取特征

    #首先从http://www.kaggle.com/c/stumbleupon/data下载训练数据,去除第一行描述信息
    sed 1d train.tsv > train_noheader.tsv
    #上传至hdfs
    [hadoop@master spark]$ hdfs dfs -put train_noheader.tsv ML/
    #启动spark-shell
    [hadoop@master spark]$ spark-shell --master yarn --driver-memory 4G
    #加载训练数据集
    scala> val rawData = sc.textFile("ML/train_noheader.tsv")
    scala> val records = rawData.map(line => line.split("\t"))
    scala> records.first()
    res3: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...
    #对原始数据进行清理工作
    scala> import org.apache.spark.mllib.regression.LabeledPoint
    scala> import org.apache.spark.mllib.linalg.Vectors
    val data = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    #提取最后一列的标记变量以及第5列到第25列的特征矩阵
    val label = trimmed(r.size - 1).toInt
    val features = trimmed.slice(4, r.size - 1).map(d => if (d ==
    "?") 0.0 else d.toDouble)
    LabeledPoint(label, Vectors.dense(features))
    }
    
    data.cache
    val numData = data.count
    
    #朴素贝叶斯模型要求特征值非负
    val nbData = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size - 1).toInt
    val features = trimmed.slice(4, r.size - 1).map(d => if (d ==
    "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d)
    LabeledPoint(label, Vectors.dense(features))
    }
    
  2. 训练分类模型

    • 首先,需要引入必要的类并对每个模型配置一些基本的输入参数。其中,需要为逻辑回归和SVM设置迭代次数,为决策树设置最大树深度
    import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
    import org.apache.spark.mllib.classification.SVMWithSGD
    import org.apache.spark.mllib.classification.NaiveBayes
    import org.apache.spark.mllib.tree.DecisionTree
    import org.apache.spark.mllib.tree.configuration.Algo
    import org.apache.spark.mllib.tree.impurity.Entropy
    val numIterations = 10
    val maxTreeDepth = 5
    
    • 首先训练逻辑回归模型
    scala> val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
    warning: there was one deprecation warning; re-run with -deprecation for details
    lrModel: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5
    
    
    • 接下来,训练SVM模型:
    scala> val svmModel = SVMWithSGD.train(data, numIterations)
    svmModel: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0
    
    
    • 接下来训练朴素贝叶斯,记住要使用处理过的没有负特征值的数据:
    scala> val nbModel = NaiveBayes.train(nbData)
    nbModel: org.apache.spark.mllib.classification.NaiveBayesModel = org.apache.spark.mllib.classification.NaiveBayesModel@20ba16b5
    
    
    • 最后训练决策树:
    scala> val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)
    dtModel: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 5 with 61 nodes
    
    
  3. 使用分类模型

    scala> val dataPoint = data.first
    dataPoint: org.apache.spark.mllib.regression.LabeledPoint = (0.0,[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])
    
    scala> val prediction = lrModel.predict(dataPoint.features)
    17/07/07 16:40:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    17/07/07 16:40:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    prediction: Double = 1.0
    scala> val trueLabel = dataPoint.label
    trueLabel: Double = 0.0
    scala> val predictions = lrModel.predict(data.map(lp => lp.features))
    predictions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[88] at mapPartitions at GeneralizedLinearAlgorithm.scala:70
    
    scala> predictions.take(5)
    res7: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
    
    
  4. 预测的正确率和错误率

    val lrTotalCorrect = data.map { point =>
    if (lrModel.predict(point.features) == point.label) 1 else 0
    }.sum
    scala> val lrAccuracy = lrTotalCorrect / data.count
    lrAccuracy: Double = 0.5146720757268425
    
    val svmTotalCorrect = data.map { point =>
    if (svmModel.predict(point.features) == point.label) 1 else 0
    }.sum
    #模型仅仅预测对了一半的训练数据,和随机猜测差不多。
    scala> val svmAccuracy = svmTotalCorrect / data.count
    svmAccuracy: Double = 0.5146720757268425
    
    val nbTotalCorrect = nbData.map { point =>
    if (nbModel.predict(point.features) == point.label) 1 else 0
    }.sum
    scala> val nbAccuracy = nbTotalCorrect / data.count
    nbAccuracy: Double = 0.5803921568627451
    
    #注意,决策树的预测阈值需要明确给出
    val dtTotalCorrect = data.map { point =>
    val score = dtModel.predict(point.features)
    val predicted = if (score > 0.5) 1 else 0
    if (predicted == point.label) 1 else 0
    }.sum
    scala> val dtAccuracy = dtTotalCorrect / data.count
    dtAccuracy: Double = 0.6482758620689655
    
    
  5. 准确率和召回率
    在信息检索中,准确率通常用于评价结果的质量,而召回率用来评价结果的完整性。

    import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
    val metrics = Seq(lrModel, svmModel).map { model =>
    val scoreAndLabels = data.map { point =>
    (model.predict(point.features), point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    #返回值是一个三元组
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    metrics: Seq[(String, Double, Double)] = List((LogisticRegressionModel,0.7567586293858841,0.5014181143280931), (SVMModel,0.7567586293858841,0.5014181143280931))
    
    val nbMetrics = Seq(nbModel).map{ model =>
    val scoreAndLabels = nbData.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR,
    metrics.areaUnderROC)
    }
    nbMetrics: Seq[(String, Double, Double)] = List((NaiveBayesModel,0.6808510815151734,0.5835585110136261))
    
    val dtMetrics = Seq(dtModel).map{ model =>
    val scoreAndLabels = data.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    val allMetrics = metrics ++ nbMetrics ++ dtMetrics
    allMetrics.foreach{ case (m, pr, roc) =>
    println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
    }
    LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
    SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
    NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
    DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%
    
    
  6. 特征标准化

    #将特征向量用RowMatrix类表示成MLlib中的分布矩阵。RowMatrix是一个由向量组成的RDD,其中每个向量是分布矩阵的一行。
    #RowMatrix类中有一些方便操作矩阵的方法,其中一个方法可以计算矩阵每列的统计特性:
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    val vectors = data.map(lp => lp.features)
    val matrix = new RowMatrix(vectors)
    #computeColumnSummaryStatistics方法计算特征矩阵每列的不同统计数据,包括均值
    #和方差,所有统计值按每列一项的方式存储在一个Vector中
    val matrixSummary = matrix.computeColumnSummaryStatistics()
    scala> println(matrixSummary.mean)
    [0.4122580529952672,2.761823191986608,0.4682304732861389,0.21407992638350232,0.09206236071899916,0.04926216043908053,2.255103452212041,-0.10375042752143335,0.0,0.05642274498417851,0.02123056118999324,0.23377817665490194,0.2757090373659236,0.615551048005409,0.6603110209601082,30.07707910750513,0.03975659229208925,5716.598242055447,178.75456389452353,4.960649087221096,0.17286405047031742,0.10122079189276552]
    scala> println(matrixSummary.min)
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0]
    scala> println(matrixSummary.variance)
    [0.10974244167559001,74.30082476809639,0.04126316989120241,0.02153343633200108,0.009211817450882448,0.005274933469767946,32.53918714591821,0.09396988697611545,0.0,0.0017177410346628928,0.020782634824610638,0.0027548394224293036,3.683788919674426,0.2366799607085986,0.22433071201674218,415.8785589543846,0.03818116876739597,7.877330081138463E7,32208.116247426184,10.45300904576431,0.03359363403832393,0.006277532884214705]
    
    #使用StandardScaler对特征向量进行归一化
    import org.apache.spark.mllib.feature.StandardScaler
    val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)
    val scaledData = data.map(lp => LabeledPoint(lp.label,
    scaler.transform(lp.features)))
    scala> println(data.first.features)
    [0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]
    scala> println(scaledData.first.features)
    [1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
    
    
  7. 使用标准化的数据重新训练模型,这里只训练逻辑回归(因为决策树和朴素贝叶斯
    不受特征标准的影响)

    val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations)
    val lrTotalCorrectScaled = scaledData.map { point =>
    if (lrModelScaled.predict(point.features) == point.label) 1 else 0
    }.sum
    val lrAccuracyScaled = lrTotalCorrectScaled / numData
    val lrPredictionsVsTrue = scaledData.map { point =>
    (lrModelScaled.predict(point.features), point.label)
    }
    val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue)
    val lrPr = lrMetricsScaled.areaUnderPR
    val lrRoc = lrMetricsScaled.areaUnderROC
    println(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%")
    
    #从结果可以看出,通过简单对特征标准化,就提高了逻辑回归的准确率,并将AUC从随机50%提升到62%。
    
    Area under ROC: ${lrRoc * 100.0}%2.4f%%")
    LogisticRegressionModel
    Accuracy: 62.0419%
    Area under PR: 72.7254%
    Area under ROC: 61.9663%
    
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容