使用Spark KMeans对地点发生重量进行聚类

聚类

Spark的机器学习库分成两类,一类是针对RDD的,在org.apache.spark.mllib包下,另一类则是针对DataFrame的,在org.apache.spark.ml包下。
本次实践从最简单的数据入手,找出一个地点的所有发生重量,然后使用KMeans算法聚类分析,先上RDD版本的。
需要引入的库是

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

实际调用的代码是

val model = KMeans.train(weightRDD, 3, 20)
val cost = model.computeCost(weightRDD)
val lines = mutable.ArrayBuffer.empty[String]
model.clusterCenters.foreach(v => {
    lines.append(v.toString.replace("[", "").replace("]", ""))
})

KMeans.train()接受三个参数,含义依次是:RDD[Vector]类型的训练数据,聚类的中心点个数,计算的迭代次数。通过之前的分析,由于大部分地点发生的重量都有1到2个密集出现的数值范围,所以姑且先把中心点个数定为3。官方给出的例子里面的计算迭代次数是20,实践中试过100,50,20三种取值,发现cost结果几乎相同,可以理解为已经达到最优解,所以判断20次足够。model.clusterCenters得到的是一组Vector就是第一个参数中所有Vector的几个中心位置,维数与传入的Vector相同。本例中,传入的Vector很无耻的只有1维,所以传出的结果也是1维的:

十九区 5977.1371889452075 ArrayBuffer(11.102791095890407, 4.854385150812066, 17.634308681672035)

结果依次是地点名称,各个点到中心点的距离之和以及三个中心点数值。

效果检验

聚类的效果如何?需要写个小算法检验一下,对于训练数据集中每个重量,找到其和多个中心点之间的最小距离,和一个阈值比对,如果小于阈值即合法。阈值的选取应该是有讲究的,目前为了简化问题,选择一个定值。本例中,阈值为2kg。
之前已经把聚类的最新结果写入数据库中,表结构大概如下:



读取聚类中心点的代码如下:

val teamCluserMap = spark.read.jdbc(mysqlHelper.DB_URL_W, "team_weight_centers",
    Array("org_id = " + orgId), mysqlHelper.PROPERTIES)
.select("team_name", "centers").rdd.map(item => {
    val centers = item.getAs[String]("centers").split(",").map(_ toDouble);
    (item.getAs[String]("team_name"), centers)
}) collect() toMap

这样就得到了一个Map<String,Array<Double>>用来存放各个地点的中心点数据。
比对的时候代码如下:

val judgeRDD = weightDataSet.filter(item => item._1 == teamName).map(item => {
    val centers = teamCluserMap.get(teamNameShort).get
    var dis = Double.MaxValue
    for (c <- centers) {
        dis = Math.min(Math.abs(item._2 - c), dis)
    }
    if (dis < 2) ("Y", 1)
    else ("N", 1)
}).rdd.reduceByKey((A, B) => A + B)
var yCount=0
var nCount=0
for(item<-judgeRDD.collect()) {
    println(teamNameShort+"\t"+item._1+"\t"+item._2)
    if(item._1 == "Y") yCount+=item._2
    if(item._1 == "N") nCount+=item._2
}
println("\t\t"+yCount.toDouble/(yCount.toDouble+nCount.toDouble))

将输出结果贴到excel中可以了。

优化聚类结果

现在可以迭代聚类-检验过程,进行参数调优了。
在保持阈值不变、聚类迭代次数不变的情况下,分别尝试3、4、5个中心点的时候,匹配率明显上升中,还是以上文中的地点为例:


3个中心点

4个中心点

5个中心点

对于定值的阈值而言,显然中心点数越多,覆盖率越大。但这个不符合实际,我们需要引入一些变化,采用每个样本点到中心点的平均距离作为阈值看起来是一个合理的方法:中心点越多,平均距离就越小,阈值也越小;中心点越多,平均距离就越大,阈值也就越大。
聚类的算法变化如下:

val cost = model.computeCost(weightRDD)

变成了

val cost = model.computeCost(weightRDD)/weightRDD.count()

这样重新运行检验时,样本上文中地点的5个中心点的覆盖率为0.785822021116138,4个中心点覆盖率为0.950980392156862,3个中心点的覆盖率则为0.981900452488687,更进一步的,2个中心点的覆盖率达到了0.993966817496229,可想而知1个中心点的覆盖率应该能达到1,但显然没有意义。
所以我们需要对每个地点进行训练,调整参数使其覆盖率在0.95~0.96之间,这样才能得到比较符合实际的聚类。

新的ml库

一开始我们说过,新的ml库是基于DataFrame的,而且Spark的官方文档上说到3.0的时候会取消基于RDD的机器学习库,所以使用新库是大势所趋。
在引入库的时候就要注意了:

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.linalg.Vectors

同时聚类算法的定义变成了:

val kmeans = new KMeans().setK(3).setSeed(1L)

中心点还是3个,随机数种子是1,如果不指定,源码指示会给出参数类的hashcode作为默认值:

final val seed: LongParam = new LongParam(this, "seed", "random seed")
setDefault(seed, this.getClass.getName.hashCode.toLong)

适应算法需要的DataFrame需要包含名为“features”的列,列的每行记录都是一个Vector,这样的数据集才能被算法识别。所以我们需要先创建类型是Vector的列,再将其命名为“features”。

val weightDataset = medicalWasteDataFrame.map(record => {
    (rfidCardMap.get(record.getAs[String]("team_id")) toString,
        Vectors.dense(record.getAs[Double]("mw_weight")))
}) cache()

以上得到的是Dataset[(String,Vector)]

val weightDataSetFilted = weightDataset
    //隐藏了一些敏感的条件过滤语句...
    .rdd.toDF("team_name","features").cache()

以上先将DataSet转成RDD,再从RDD转成DataFrame,这样我们就有了算法可用的训练数据集了。

val model = kmeans.fit(weightDataSetFilted)

其他的就和之前的写法没有区别。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容