引导
我们在前一篇已经学习了spark的相关概念,并写了一个简单的demo,那么我们本篇开始深入的学习spark其中的最核心的一个概念RDD
2.1、什么是RDD?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,一个可并行操作的有容错机制的数据集合,是Spark中最基本的数据抽象,我们就把它当成简单的数据集合。
2.2、怎么使用RDD?
有两种方式创建RDD,
- 1、第二种是初始化一个集合,这种叫做并行集合。
- 2、第一种是读取外部数据集,如:共享的文件系统,HDFS,HBase,或者其他的Hadoop数据格式的数据源
两种创建方式都需要一样sc对象,sc同上文为JavaSparkContext
创建方法如下图:
//并行集合
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
//外部文件
JavaRDD<String> lines = sc.textFile("/Users/sunliangliang/Documents/personal/csv/000002.csv");
2.3、RDD操作
spark算子:英文Operator,其实就是操作的意思,我们这里面是指操作运算等等,其实就是指函数化,通过调用函数处理,一个函数可以称之为算子。
RDD的操作分为两种
2.3.1、转化操作
这类称之为Transformation
(转换/变换算子),这类操作是延迟计算,即从一个RDD转换成另外一个并不会马上执行,需要等待行动操作的时候才执行。
常见的Transformation
算子如下
- map()
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCountLocal")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4));
/**通过map算子实现平方**/
JavaRDD<Integer> result = datas.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer x) throws Exception {
return x*x; }
});
System.out.println(StringUtils.join(result.collect(),","));
}
运行结果如下图:
1,4,9,16
- fliter()
JavaRDD<Integer> filter = datas.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer x) throws Exception {
return x>2;
}
});
我们可以猜想输出为>2的值
3,4
- flatMap()
这是将每个输入元素生成多个输出元素,拍扁的意思,也就是将每个元素按照格式拆分成一行如下图
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCountLocal")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> datas = sc.textFile("/Users/sunliangliang/Documents/personal/spark.txt");
JavaRDD<String> flatMap = datas.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split("\\s")).iterator();
}
});
JavaRDD<List<String>> map = datas.map(new Function<String, List<String>>() {
@Override
public List<String> call(String s) throws Exception {
return Arrays.asList(s.split("\\s"));
}
});
System.out.println(StringUtils.join(map.collect(),","));
System.out.println(StringUtils.join(flatMap.collect(),","));
}
输出结果
[hello, world],[a, new, line],[hello],[the, end]
hello,world,a,new,line,hello,the,end
其中spark.txt中的内容如下
hello world
a new line
hello
the end
我们看到将map是按行拆分,而flapMap 拆成了一个个单词,如下图
其他常见操作如下图
2.3.2、行动操作
行动操作主要包含,collect(),reduce(),aggregate()等
- reduce()
接收一个函数作为参数,这个函数操作两个相同类型的元素,并返回一个同样类型的数据。最常见的就是叠加等。
public static void reduce(){
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
int sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) throws Exception {
return x+y;
}
});
System.out.println(sum);
}
输出结果如下
15
- aggregate()
TODO
求平均值
2.4、不同类型间的转换
JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result = datas.mapToDouble(new DoubleFunction<Integer>() {
@Override
public double call(Integer x) throws Exception {
return (double) x+x;
}
});
2.5、持久化
通过以下两种方式持久化一个rdd,然后将其保存在美国节点内存。该缓存是一个容错技术。
也就是缓存,有两种方式
- cache():只是缓存到默认的缓存级别:只使用内存
- persist():可以自定义缓存级别
使用方式如下
rdd.persist(StorageLevel.DISK_ONLY());
rdd.cache();