创建RDD
把程序中一个已有的集合传给 SparkContext 的 parallelize() 方法(主要用于测试)
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
更常用的方式是从外部读取数据来创建RDD
JavaRDD<String> testFile = js.textFile("G:/sparkRS/readtest.txt");
RDD操作
转化操作:惰性求值,返回一 个新的 RDD 的操作,比如 map() 和 filter(),
行动操作:向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()。
- 转化操作
只有在行动操作中用到这些 RDD 时才会被计算。许多转化操作都是针对各个元素的,这些转化操作每次只会操作 RDD 中的一个元素。不过并不是所有的转化操作都是这样的
//filter
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext js = new JavaSparkContext(conf);
JavaRDD<String> lines = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
long result = lines.filter(x-> x.contains("coffe")).count();
System.out.println(result);
js.close();
//union将两个RDD合并
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext js = new JavaSparkContext(conf);
JavaRDD<String> lines = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
JavaRDD<String> lines1 = js.parallelize(Arrays.asList("coffe","coffe","panda","monkey","tea"));
JavaRDD<String> result = lines.filter(x-> x.contains("coffe"));
JavaRDD<String> result1 = lines1.filter(x-> x.contains("tea"));
JavaRDD<String> outcome = result.union(result1);
System.out.println(outcome.collect());
js.close();
通过转化操作,从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据
- 行动操作
对数据进行实际的计算,行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的RDD转化操作
System.out.println(outcome.collect()); //collect将RDD中的所有数据进行收集,需要大内存
System.out.println(outcome.count());
RDD.take(10)
使用 take() 获取了RDD 中的少量元素集。然后在本地遍历这些元素,并在驱动器端打印出来。RDD还有一个 collect() 函数,可以用来获取整 个 RDD中的数据。只有当你的整个数据集能在单台机器的内存中放得下时,才能使用collect,因此,collect()不能用在大规模数据集上。在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。每当我们调用一个新的行动操作时,整个 RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化
惰性操作
惰性求值意味着当我们对 RDD 调用转化操作,操作不会立即执行。 Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到 RDD 的操作也同样是惰性的。和转化操作一样的是, 读取数据的操作也有可能会多次执行。虽然转化操作是惰性求值的,但还是可以随时通过运行一个行动操作来强制 Spark 执行 RDD 的转化操作,比如使用 count()。
Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。( Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少 MapReduce 的周期数)
传递函数
Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。支持的三种主要语言中都略有不同(函数接口)
Java
在 Java 中,函数需要作为实现了 Spark 的 org.apache.spark.api.java.function 包中的任 一函数接口的对象来传递,不同返回类型有不同接口
//匿名类进行函数传递
RDD<String> errors = lines.filter(new Function<String, Boolean>() {
public Boolean call(String x) {
return x.contains("error");
} });
//使用具名类进行函数传递,继承xx接口,在实例化时就可自动向上转型当做接口类型
class ContainsError implements Function<String, Boolean> {
public Boolean call(String x) {
return x.contains("error");
} }
RDD<String> errors = lines.filter(new ContainsError());
常见的转化操作和行动操作
包含特定数据类型的 RDD 还支持一些附加操作,例如,数字类型的 RDD 支持统计型函数操作,而键值对形式的 RDD 则支持诸如根据键聚合数据的键值对操作
针对各个元素的转化操作
map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD 中对应元素的值
filter() 则接收一个函数,并将 RDD 中满足该函数的 元素放入新的 RDD 中返回
map() 的返回值类型不需要和输入类型一样
对每个输入元素生成多个输出元素。 flatMap() 返回值序列的迭代器。输出的 RDD 倒不是由迭代器得到的是一个包含各个迭代器可访问的所有元素的 RDD。flatMap() 的一个简 单用途是把输入的字符串切分为单词
//数组中的iterator方法可以将数组转换为迭代器
JavaRDD<String> words = word.flatMap(x->Arrays.asList(x.split(",")).iterator() );
伪集合操作
RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。RDD.distinct() 转化操作来生成一个只包含不同元素的新RDD。distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份
集合操作 union(other),返回一个包含两个 RDD 中所有元素的 RDD。Spark 的 union() 操作也会包含这些重复数据 (可通过 distinct() 实现相同的效果)。
Spark 还提供了交集 intersection(other) 方法,与union方法相似,只返回两个 RDD 中都有的元素。但是intersection() 的性能却要差很多,它需要网络混洗数据发现共有数据
subtract(other) 函数接收另一个 RDD 作为参数,返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。需要数据混洗。
计算两个 RDD 的笛卡儿积,cartesian(other) 转化操作会返回所有可能的 (a, b) 对。笛卡儿积在我们希望考虑所有可能的组合的相似度时比较有用(产品的预期兴趣程度),开销巨大。
行动操作
对RDD数据进行实际计算
基本 RDD 上最常见的行动操作 reduce()。接收一个函数作为参数,这个函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素
Integer results = counts.reduce((x,y)->{ return x+y; });
折叠方法fold() 和 reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个 “初始值”来作为每个分区第一次调用时的结果。使用你的函数对这个初始值进行多次计算不会改变结果,通过原地修改并返回两个参数中的前一个的值来节约在 fold() 中创建对象的开销fold() 和 reduce() 都要求函数的返回值类型需要和我们所操作的 RDD 中的元素类型相同。在计算平均值时,需要记录遍历过程中的计数以及元素的数量,这就需要我们返回一 个二元组。对数据使用 map() 操作,来把元素转为该元素和 1 的二元组
//reduce求平均
JavaPairRDD<String,Integer> counts = words.mapToPair(s -> new Tuple2<String, Integer>(s,1));
//reduce求总数和总次数,Tuple2的字段_1和_2是final型不能
//改变,必须有一个可以操作的变量才能对Tuple2中的数进行计算
//所以,先将第一个RDD的Tuple2赋值给a、b
//然后和y(第二个数)进行计算,返回第一次调用的计算结果
//然后第一次的计算结果再和第三个Tuple2进行计算返回第二次的调用结果。。。
Tuple2<Integer, Integer> results1 = counts.reduce((x,y)->{
Integer a = x._1();
Integer b = x._2();
a+=y._1();
b+=y._2();
return new Tuple2(a,b);
});
//fold求平均,过程与上大致一样
Integer reduce = line.fold(0, (x,y) -> x+y);
aggregate 函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来。使用 aggregate() 时,需要提供我们期待返回的类型(自定义)的初始值。然后通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。
//用aggregate()来计算RDD的平均值
public class Operation {
public static void main(String[] args) throws InterruptedException {
// TODO 自动生成的方法存根
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<Integer> lines = jsc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> line = jsc.parallelize(Arrays.asList(4,5,6));
AvgCount a =new AvgCount(0,0);
Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>(){
private static final long serialVersionUID = 1L; @Override
public AvgCount call(AvgCount arg0, Integer arg1) throws Exception {
// TODO 自动生成的方法存根
arg0.total += arg1;
arg0.num += 1;
return arg0;
}
};
Function2<AvgCount,AvgCount,AvgCount> conbine = new Function2<AvgCount,AvgCount,AvgCount>(){
private static final long serialVersionUID = 1L;
@Override
public AvgCount call(AvgCount arg0, AvgCount arg1) throws Exception {
// TODO 自动生成的方法存根
arg0.total += arg1.total;
arg0.num += arg1.num;
return arg0;
}
};
line.aggregate(a,(x,y)->{
x.total += y;
x.num += 1;
return x;
},
(x,y)->{
x.total +=y.total;
x.num +=y.num;
return x;}
);
AvgCount sum = line.aggregate(a, addAndCount, conbine);
System.out.println( sum.total+":"+sum.num+"--------avg:"+(sum.total/sum.num));
jsc.close();
}
}
class AvgCount implements Serializable{
public int total;
public int num;
private static final long serialVersionUID = 3325529460700487293L;
public AvgCount(int total,int num){
this.total = total;
this.num = num;
}
}
RDD 的一些行动操作会以普通集合或者值的形式将 RDD 的部分或全部数据返回驱动器程序中。
collect() 通常在单元测试中使用,因为此时 RDD 的整个内容不会很大,可以放在内存中take(n) 返回 RDD 中的 n 个元素集合,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。这些操作返回元素的顺序与你预期的可能不一样。这些操作对于单元测试和快速调试都很有用,但是在处理大规模数据时会遇到瓶颈。可以用 JSON 格式把数据发送到一个网络服务器上,或者把数 据存到数据库中。都可以使用 foreach() 行动操作来对 RDD 中的每个元 素进行操作,而不需要把 RDD 发回本地。
在不同RDD类型间转换
有些函数只能用于特定类型的 RDD,比如 mean() 和 variance() 只能用在数值 RDD 上, 而 join() 只能用在键值对 RDD 上
Java
要从 T 类型的 RDD 创建出一个 DoubleRDD,我们就应当在映射操作中使用 DoubleFunction<T> 来替代 Function<T, Double>
生成JavaDoubleRDD、计算 RDD 中每个元素的平方值,这样就可以调用 DoubleRDD 独有的函数了,比如平均是 mean() 和方差 variance()。
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x * x;
} });
System.out.println(result.mean());
持久化(缓存)
Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖
迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据
为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。当我们让 Spark 持久化 存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
默认情况下persist会把数据以序列化的形式缓存在JVM的堆空间中(实际数据区)
Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
//对result进行缓存
result.persist(StorageLevel.DISK_ONLY)
result.persist(StorageLevel.DISK_ONLY_2)
persist() 调用本身不会触发强制求值
如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘
RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓 存中移除