action操作

  • reduce()
  • collect()
  • count()
  • take()
  • saveAsTextFile()
  • countByKey()
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * action操作实战
 * @author Administrator
 *
 */
@SuppressWarnings("unused")
public class ActionOperation {
    
    public static void main(String[] args) {
        // reduce();
        // collect();
        // count();
        // take();
        // saveAsTextFile();
        countByKey();
    }
    
    private static void reduce() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("reduce")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用reduce操作对集合中的数字进行累加
        // reduce操作的原理:
            // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3
            // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6
            // 以此类推
        // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素
        int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
            
        });
        
        System.out.println(sum);  
        
        // 关闭JavaSparkContext
        sc.close();
    }
    
    private static void collect() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("collect")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                
                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                    
                });
        
        // 不用foreach action操作,在远程集群上遍历rdd中的元素
        // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地
        // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条
            // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地
            // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出
        // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理
        List<Integer> doubleNumberList = doubleNumbers.collect();
        for(Integer num : doubleNumberList) {
            System.out.println(num);  
        }
        
        // 关闭JavaSparkContext
        sc.close();
    }
    
    private static void count() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("count")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 对rdd使用count操作,统计它有多少个元素
        long count = numbers.count();
        System.out.println(count);  
        
        // 关闭JavaSparkContext
        sc.close();
    }
    
    private static void take() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("take")
                .setMaster("local");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 对rdd使用count操作,统计它有多少个元素
        // take操作,与collect类似,也是从远程集群上,获取rdd的数据
        // 但是collect是获取rdd的所有数据,take只是获取前n个数据
        List<Integer> top3Numbers = numbers.take(3);
        
        for(Integer num : top3Numbers) {
            System.out.println(num);  
        }
        
        // 关闭JavaSparkContext
        sc.close();
    }
    
    private static void saveAsTextFile() {
        // 创建SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("saveAsTextFile");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        
        // 使用map操作将集合中所有数字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                
                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                    
                });
        
        // 直接将rdd中的数据,保存在HFDS文件中
        // 但是要注意,我们这里只能指定文件夹,也就是目录
        // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件
        doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");   
        
        // 关闭JavaSparkContext
        sc.close();
    }
    
    @SuppressWarnings("unchecked")
    private static void countByKey() {
        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("countByKey")  
                .setMaster("local");
        // 创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        // 模拟集合
        List<Tuple2<String, String>> scoreList = Arrays.asList(
                new Tuple2<String, String>("class1", "leo"),
                new Tuple2<String, String>("class2", "jack"),
                new Tuple2<String, String>("class1", "marry"),
                new Tuple2<String, String>("class2", "tom"),
                new Tuple2<String, String>("class2", "david"));  
        
        // 并行化集合,创建JavaPairRDD
        JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
        
        // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数
        // 这就是countByKey的作用
        // countByKey返回的类型,直接就是Map<String, Object>
        Map<String, Object> studentCounts = students.countByKey();
        
        for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
            System.out.println(studentCount.getKey() + ": " + studentCount.getValue());  
        }
        // class1: 2
        // class2: 3

        // 关闭JavaSparkContext
        sc.close();
    }
    
}

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object ActionOperation {
  
  def main(args: Array[String]) {
    // reduce()  
    // collect()  
    // count() 
    // take() 
    countByKey()  
  }
  
  def reduce() {
    val conf = new SparkConf()
        .setAppName("reduce")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val sum = numbers.reduce(_ + _)  
    
    println(sum)  
  }
  
  def collect() {
    val conf = new SparkConf()
        .setAppName("collect")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val doubleNumbers = numbers.map { num => num * 2 }  
    
    val doubleNumberArray = doubleNumbers.collect()
    
    for(num <- doubleNumberArray) {
      println(num)  
    }
  }
  
  def count() {
    val conf = new SparkConf()
        .setAppName("count")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    val count = numbers.count()
    
    println(count)  
  }
  
  def take() {
    val conf = new SparkConf()
        .setAppName("take")
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numbers = sc.parallelize(numberArray, 1)  
    
    val top3Numbers = numbers.take(3)
    
    for(num <- top3Numbers) {
      println(num)  
    }
  }
  
  def saveAsTextFile() {
    
  }
  
  def countByKey() {
    val conf = new SparkConf()
        .setAppName("countByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)
    
    val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),
        Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))   
    val students = sc.parallelize(studentList, 1)  
    val studentCounts = students.countByKey()  
    
    println(studentCounts)  // Map(class1 -> 2, class2 -> 3)
  }
  
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容