spark-mllib - Basic information

mllib 数据类型
Local Vector :整数索引下标,从0开始;存储double类型的值,而且只存储在单台机器上。

两种类型的vector
dense:密集型向量,就是将所有值存储在数组中。包括0值; [1.0, 0.0, 3.0] ; 类型对象: DenseVector
sparse:稀疏型向量,两个平行数组,分别表示小标和值 ; (3, [0,2], [1.0, 3.0]); 3表示向量有三个值,第一个数组[0,2] 表示有非零值的数组下标;[1.0,3.0] 表示数值;类型对象:SparseVector

 import org.apache.spark.mllib.linalg.{Vector, Vectors}
  // Create a dense vector (1.0, 0.0, 3.0).
 val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
 // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
 // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

需要显示引用 org.apache.spark.mllib.linalg.Vector

**Labeled point :
**标签; 本地向量,可以是密集或稀疏型的,与对应的标签或答复。该数值类型,通常用于监督学习算法。用浮点数值表示标签值,那样我们机会可以在归回和分类中一起使用了。比如二元分类,标签值就表示(0 or 1);多元分类中,标签值可以是,0,1,2,3......

     类型对象为 LabeledPoint      
import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

稀疏值表示:该类型比较常见和通用,在LIBSVM格式,libsvm和liblinear默认的存储标签数据存储格式。在文本中每一行表示 标签值和稀疏表示的特征向量。

label index1:value1 index2:value2 ...
    indexN表示向量下标,从0开始。并且是递增的。

加载该类型数据的接口表示

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

Local matrix: ​存储在本地的矩阵,下标类型为整数、值类型为浮点;mllib也支持稀疏矩阵和密集型矩阵;所有整体数值存储在单维浮点数组中。

import org.apache.spark.mllib.linalg.{Matrix, Matrices}// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
  通过工厂类构建矩阵,3,2分表表示,行、列数,后面Array中的表示矩阵中具体值

Distributed matrix: 分布式存储矩阵,行、列下标将有长整型类型。值还是浮点类型;分布在多个RDDs中。对于大数据量的矩阵选择合适的格式存储就会显得很重要了。因为转换一次矩阵格式,需要全局shuffle,这是非常消耗性能。

目前有三种类型的分布式矩阵可使用:

RowMatrix:整个大矩阵会被拆分成多个row,然后以Row为对象存储在不同节点的RDD中,可以与驱动器进行交互运算处理。该类型的前提是单向量不宜过大,不能超出单节点的容量;
IndexedRowMatrix: 该类型与rowMatrix类型相似,只不过还存储了index,可以进行快速定位获取row和连接操作。
CoordinateMatix:坐标矩阵,将矩阵值分撒存储,单值存储表示,有行、列坐标和对应值组成的tuple对象组合成整体的矩阵,存储在RDD中的实体;主要应用在维度和矩阵非常巨大的时候并且矩阵的值稀疏型的情况下

    rowMatrix:应用
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)// Get its size.
val m = mat.numRows()
val n = mat.numCols()

IndexedRowMatrix:应用

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)// Get its size.
val m = mat.numRows()
val n = mat.numCols()// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

**CoordinateMatrix: 应用 entries ==》 (Long, Long, Double) tuple

**

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)// Get its size.
val m = mat.numRows()val n = mat.numCols()// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

概要统计:
Summary statistics
colStats() 接口返回的是MultivariateStatisticalSummary, 它包含了对列向量的概要信息,最大值、最小值、平均值、方差、非零个数和总个数

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
val observations: RDD[Vector] = ... // an RDD of Vectors// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)println(summary.mean) // a dense vector containing the mean value for each columnprintln(summary.variance) // column-wise 
varianceprintln(summary.numNonzeros) // number of nonzeros in each column

Correlations 相关性统计:两个系列的数据统计分析,目前主要是Pearson 和Spearman相关性;统计参数可以为RDD[double] 或RDD[Vector] 对应的返回值分别为 double和 Matrix

import  org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
val sc : SparkContext = ...
val seriesX: RDD[Double] = ... // a series
val seriesY : RDD[Double]  = ... 
// must have the same number of partitions and cardinality as seriesX
// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a 
// method is not specified, Pearson's method will be used by default. 
val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson”) // 参数pearson 或 spearman 
val data: RDD[Vector] = ... 
// note that each Vector is a row and not a column
// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default. 
val correlMatrix: Matrix = Statistics.corr(data, "pearson")

Stratified sampling 抽样分层函数:
sampleByKey()
:根据外部传入参数,制定key,抽样占比等映射关系表来抽样;是随机抽样的
sampleByKeyExact()
:根据映射对照表参数,精确的获取抽样数据,置信度可到99.99%;
以上两者关键区别是样本获取的期望值,待exact的必须要得到期望的比率,而不带exact的就只要不超过设置的期望占比就行,

接口调用案例

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.PairRDDFunctions
val sc: SparkContext = ...
val data = ... // an RDD[(K, V)] of any key value pairs
val fractions: Map[K, Double] = ... // specify the exact fraction desired from each key// Get an exact sample from each stratum
val approxSample = data.sampleByKey(withReplacement = false, fractions)
val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)

Hypothesis testing
假设检验是 总体的特征作出某种假设,然后通过抽样研究的统计推理,对此假设应该被拒绝还是接受作出推断

在mllib中有 Pearson chi-squared 检验

案例应用:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics._
val sc: SparkContext = ...
val vec: Vector = ... // a vector composed of the frequencies of events
// compute the goodness of fit. If a second vector to test against is not supplied as a parameter, 
// the test runs against a uniform distribution. 
val goodnessOfFitTestResult = Statistics.chiSqTest(vec)
println(goodnessOfFitTestResult) 
// summary of the test including the p-value, degrees of freedom, 
 // test statistic, the method used, and the null hypothesis.
val mat: Matrix = ... // a contingency matrix
// conduct Pearson's independence test on the input contingency matrix
val independenceTestResult = Statistics.chiSqTest(mat) 
println(independenceTestResult) 
// summary of the test including the p-value, degrees of freedom...
val obs: RDD[LabeledPoint] = ... // (feature, label) pairs.
// The contingency table is constructed from the raw (feature, label) pairs and used to conduct
// the independence test. Returns an array containing the ChiSquaredTestResult for every feature
 // against the label.
val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs)
var i = 1
featureTestResults.foreach { result => println(s"Column $i:\n$result") i += 1} // summary of the test 

** Random data generation**
随机生成数据,MLlib提供了随机生成RDDs的方法,随机类型有标准均匀分布、正态分布、泊松分布

标准正态分布示例:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._
val sc: SparkContext = ...// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)// Apply a transform to get a random double RDD following `N(1, 4)`.val v = u.map(x => 1.0 + 2.0 * x)

===========================================================================================================
Spark 2.0版本新增加内容

**BlockMatrix
**
块矩阵是存储在RDD中分布式的矩阵。MatrixBlock是tuple存储格式为((Int, Int),Matrix),(Int,Int)表示块索引,Matrix表示指定索引下的子矩阵,rowPerBlock x colsPerBlock BlockMatrix 支持 矩阵相加、矩阵乘法运算。BlockMatrix 校验接口函数 validate,可通过该接口判断该BlockMatrix设置是否正确

BlockMatrix的生成,比较容易,可以通过IndexedRowMatrix 或CoordinateMatrix调用toBlockMatrix,块矩阵默认行列值为1024*1024,可以通过构造函数指定行列值 toBlockMatrix(rowsPerBlock colsPerBlock)

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.// Nothing happens if it is valid.
matA.validate()// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

Streaming Significance Testing
在线实时应用校验用于支持A/B testing,应用于Spark Streaming DStream(Boolean, Double), boolean值参数 false表示控制类,true表示treatment类,double 测试结果值

可设置参数有如下

peacePeriod:静默时期时长,该段时间忽略数据; 为减轻新奇效应

windowSize: 窗口期接受RDD个数用于假设检验,如果设置0,就累加整个周期的RDD

val data = ssc.textFileStream(dataDir).map(line => line.split(",") 
match { case Array(label, value) => BinarySample(label.toBoolean, value.toDouble)})
val streamingTest = new StreamingTest() .setPeacePeriod(0) .setWindowSize(0) .setTestMethod("welch")
val out = streamingTest.registerStream(data)out.print()

Kernel density estimation
核密度估计:
是一种有用的可视化经验概率分布的技术,而不需要假设的特定分布,所观察到的样本;计算随机变量的估计概率密度函数,评价给定点集合。通过表达的经验分布的PDF在特定点的平均正常分布的PDF围绕每个样品达到这一估计。

import org.apache.spark.mllib.stat.KernelDensityimport org.apache.spark.rdd.RDD// an RDD of sample data
val data: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 9))
// Construct the density estimator with the sample data and a standard deviation
// for the Gaussian kernels
val kd = new KernelDensity() .setSample(data) .setBandwidth(3.0)
// Find density estimates for the given values
val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容