Spark SQL(十一):与Spark Core整合

每日top3热点搜索词统计Demo

1、数据格式:

日期 用户 搜索词 城市 平台 版本

2、需求:

1、筛选出符合查询条件(城市、平台、版本)的数据
2、统计出每天搜索uv排名前3的搜索词
3、按照每天的top3搜索词的uv搜索总次数,倒序排序
4、将数据保存到hive表中

3、实现思路:

  • 1、针对原始数据(HDFS文件),获取输入的RDD
  • 2、使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据;
    • 2.1 普通的做法:直接在fitler算子函数中,使用外部的查询条件(Map),但是,这样做的话,是不是查询条件Map,会发送到每一个task上一份副本。(性能并不好);
    • 2.2 优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量进行数据筛选;
  • 3、将数据转换为“(日期搜索词, 用户)”格式,然后呢,对它进行分组,然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的uv。最后,获得“(日期搜索词, uv)” ;
  • 4、将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame;
  • 5、将DataFrame注册为临时表,使用Spark SQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取,是一个DataFrame;
  • 6、将DataFrame转换为RDD,继续操作,按照每天日期来进行分组,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串
  • 7、按照每天的top3搜索总uv,进行排序,倒序排序
  • 8、将排好序的数据,再次映射回来,变成“日期_搜索词_uv”的格式
  • 9、再次映射为DataFrame,并将数据保存到Hive中即可
package cn.spark.study.sql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

/**
 * 每日top3热点搜索词统计案例
 */
public class DailyTop3Keyword {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("DailyTop3Keyword");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext sqlContext = new HiveContext(sc.sc());  
        
        // 伪造出一份数据,查询条件
        // 备注:实际上,在工作中,这个查询条件,是通过J2EE平台插入到某个MySQL表中的
        // 然后,在这里通过Spring框架和ORM框架(MyBatis),去提取MySQL表中的查询条件
        Map<String, List<String>> queryParamMap = new HashMap<String, List<String>>();
        queryParamMap.put("city", Arrays.asList("beijing"));  
        queryParamMap.put("platform", Arrays.asList("android"));  
        queryParamMap.put("version", Arrays.asList("1.0", "1.2", "1.5", "2.0"));  
        
        // 根据我们实现思路中的分析,这里最合适的方式,
        //是将该查询参数Map封装为一个Broadcast广播变量
        // 这样可以进行优化,每个Worker节点,只拷贝一份数据即可
        final Broadcast<Map<String, List<String>>> queryParamMapBroadcast = 
                sc.broadcast(queryParamMap);
        
        // 1、针对HDFS文件中的日志,获取输入RDD
        JavaRDD<String> rawRDD = sc.textFile("hdfs://spark1:9000/spark-study/keyword.txt"); 
        
        // 2、使用查询参数Map广播变量,进行筛选
        JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String log) throws Exception {
                // 切割原始日志,获取城市、平台和版本
                String[] logSplited = log.split("\t");  
                
                String city = logSplited[3];
                String platform = logSplited[4];
                String version = logSplited[5];
                
                // 与查询条件进行比对,任何一个条件,只要该条件设置了,且日志中的数据没有满足条件
                // 则直接返回false,过滤该日志
                // 否则,如果所有设置的条件,都有日志中的数据,则返回true,保留日志
                Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                
                List<String> cities = queryParamMap.get("city");  
                if(cities.size() > 0 && !cities.contains(city)) {
                    return false;
                }
                
                List<String> platforms = queryParamMap.get("platform");  
                if(platforms.size() > 0 && !platforms.contains(platform)) {
                    return false;
                }
                
                List<String> versions = queryParamMap.get("version");  
                if(versions.size() > 0 && !versions.contains(version)) {
                    return false;
                }
                
                return true;
            }
            
        });
        
        // 3、过滤出来的原始日志,映射为(日期_搜索词, 用户)的格式
        JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(
                
                new PairFunction<String, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(String log) throws Exception {
                        String[] logSplited = log.split("\t");  
                        
                        String date = logSplited[0];
                        String user = logSplited[1];
                        String keyword = logSplited[2];
                        
                        return new Tuple2<String, String>(date + "_" + keyword, user);
                    }
                    
                });
        
        // 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
        JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
        
        // 对每天每个搜索词的搜索用户,执行去重操作,获得其uv
        JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(
                
                new PairFunction<Tuple2<String,Iterable<String>>, String, Long>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Tuple2<String, Long> call(
                            Tuple2<String, Iterable<String>> dateKeywordUsers) throws Exception {
                        String dateKeyword = dateKeywordUsers._1;
                        Iterator<String> users = dateKeywordUsers._2.iterator();
                        
                        // 对用户进行去重,并统计去重后的数量
                        List<String> distinctUsers = new ArrayList<String>();
                        
                        while(users.hasNext()) {
                            String user = users.next();
                            if(!distinctUsers.contains(user)) {
                                distinctUsers.add(user);
                            }
                        }
                        
                        // 获取uv
                        long uv = distinctUsers.size();
                        
                        return new Tuple2<String, Long>(dateKeyword, uv);  
                    }
                    
                });
        
        //4、 将每天每个搜索词的uv数据,转换成DataFrame
        JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(
                
                new Function<Tuple2<String,Long>, Row>() {

                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {
                        String date = dateKeywordUv._1.split("_")[0];
                        String keyword = dateKeywordUv._1.split("_")[1];
                        long uv = dateKeywordUv._2;
                        return RowFactory.create(date, keyword, uv);
                    }
                    
                });
        
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("date", DataTypes.StringType, true),
                DataTypes.createStructField("keyword", DataTypes.StringType, true),
                DataTypes.createStructField("uv", DataTypes.LongType, true));
        StructType structType = DataTypes.createStructType(structFields);
        // 将该RDD转换为DataFrame
        DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
        
        // 5、使用Spark SQL的开窗函数,统计每天搜索uv排名前3的热点搜索词
        dateKeywordUvDF.registerTempTable("daily_keyword_uv");  
        
        DataFrame dailyTop3KeywordDF = sqlContext.sql(""
                + "SELECT date,keyword,uv "
                + "FROM ("
                    + "SELECT "
                        + "date,"
                        + "keyword,"
                        + "uv,"
                        + "row_number() OVER (PARTITION BY date ORDER BY uv DESC) rank "
                    + "FROM daily_keyword_uv"  
                + ") tmp "
                + "WHERE rank<=3");  
        
        // 6、将DataFrame转换为RDD,然后映射,计算出每天的top3搜索词的搜索uv总数
        JavaRDD<Row> dailyTop3KeywordRDD = dailyTop3KeywordDF.javaRDD();
        
        JavaPairRDD<String, String> top3DateKeywordUvRDD = dailyTop3KeywordRDD.mapToPair(
                new PairFunction<Row, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Row row)
                            throws Exception {
                        String date = String.valueOf(row.get(0));  
                        String keyword = String.valueOf(row.get(1));  
                        Long uv = Long.valueOf(String.valueOf(row.get(2)));                         
                        return new Tuple2<String, String>(date, keyword + "_" + uv);
                    }
                    
                });
        
        JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = top3DateKeywordUvRDD.groupByKey();
        
        JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(
                new PairFunction<Tuple2<String,Iterable<String>>, Long, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, String> call(
                            Tuple2<String, Iterable<String>> tuple)
                            throws Exception {
                        String date = tuple._1;
                        
                        Long totalUv = 0L;
                        String dateKeywords = date;  
                        
                        Iterator<String> keywordUvIterator = tuple._2.iterator();
                        while(keywordUvIterator.hasNext()) {
                            String keywordUv = keywordUvIterator.next();
                            
                            Long uv = Long.valueOf(keywordUv.split("_")[1]);  
                            totalUv += uv;
                            
                            dateKeywords += "," + keywordUv;
                        }
                        
                        return new Tuple2<Long, String>(totalUv, dateKeywords);
                    }
                    
                });
        
        // 7、按照每天的总搜索uv进行倒序排序
        JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
        
        //8、再次进行映射,将排序后的数据,映射回原始的格式,Iterable<Row>
        JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(
                
                new FlatMapFunction<Tuple2<Long,String>, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<Row> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        String dateKeywords = tuple._2;
                        String[] dateKeywordsSplited = dateKeywords.split(",");  
                        
                        String date = dateKeywordsSplited[0];
                        
                        List<Row> rows = new ArrayList<Row>();
                        rows.add(RowFactory.create(date, 
                                dateKeywordsSplited[1].split("_")[0],
                                Long.valueOf(dateKeywordsSplited[1].split("_")[1]))); 
                        rows.add(RowFactory.create(date, 
                                dateKeywordsSplited[2].split("_")[0],
                                Long.valueOf(dateKeywordsSplited[2].split("_")[1]))); 
                        rows.add(RowFactory.create(date, 
                                dateKeywordsSplited[3].split("_")[0],
                                Long.valueOf(dateKeywordsSplited[3].split("_")[1]))); 
                        
                        return rows;
                    }
                    
                });
        
        //9、将最终的数据,转换为DataFrame,并保存到Hive表中
        DataFrame finalDF = sqlContext.createDataFrame(sortedRowRDD, structType);
        
        finalDF.saveAsTable("daily_top3_keyword_uv");
        
        sc.close();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容