在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。
练习:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额,统计每个种类的销售额排名前3的产品
UV:每天都有很多用户来访问,但是每个用户可能每天都会访问很多次,所以,uv,指的是,对用户进行去重以后的访问总数
1、统计每日的uv
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._
object DailyUV {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("DailyUV")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换
import sqlContext.implicits._
// 构造用户访问日志数据,并创建DataFrame
// 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id
val userAccessLog = Array(
"2015-10-01,1122",
"2015-10-01,1122",
"2015-10-01,1123",
"2015-10-01,1124",
"2015-10-01,1124",
"2015-10-02,1122",
"2015-10-02,1121",
"2015-10-02,1123",
"2015-10-02,1123");
val userAccessLogRDD = sc.parallelize(userAccessLog, 5)
// 将模拟出来的用户访问日志RDD,转换为DataFrame
// 首先,将普通的RDD,转换为元素为Row的RDD
val userAccessLogRowRDD = userAccessLogRDD
.map { log => Row(log.split(",")(0), log.split(",")(1).toInt) }
// 构造DataFrame的元数据
val structType = StructType(Array(
StructField("date", StringType, true),
StructField("userid", IntegerType, true)))
// 使用SQLContext创建DataFrame
val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType)
//聚合函数的用法(countDistinct)
// 首先,对DataFrame调用groupBy()方法,对某一列进行分组
// 然后,调用agg()方法 ,第一个参数,必须传入之前在groupBy()方法中出现的字段
// 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数
// 内置函数中,传入的参数,是用单引号作为前缀的其他字段
userAccessLogRowDF.groupBy("date")
.agg('date', countDistinct(userid))
.map { row => Row(row(1), row(2)) }
.collect()
.foreach(println)
}
}
2、统计每天的销售额
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._
/**
* @author Administrator
*/
object DailySale {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("DailySale")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 说明一下,业务的特点
// 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计
// 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了
// 模拟数据
val userSaleLog = Array("2015-10-01,55.05,1122",
"2015-10-01,23.15,1133",
"2015-10-01,15.20,",
"2015-10-02,56.05,1144",
"2015-10-02,78.87,1155",
"2015-10-02,113.02,1123")
val userSaleLogRDD = sc.parallelize(userSaleLog, 5)
// 进行有效销售日志的过滤
val filteredUserSaleLogRDD = userSaleLogRDD
.filter { log => if (log.split(",").length == 3) true else false }
val userSaleLogRowRDD = filteredUserSaleLogRDD
.map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) }
val structType = StructType(Array(
StructField("date", StringType, true),
StructField("sale_amount", DoubleType, true)))
val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
// 开始进行每日销售额的统计
userSaleLogDF.groupBy("date")
.agg('date', sum(sale_amount))
.map { row => Row(row(1), row(2)) }
.collect()
.foreach(println)
}
}
3、统计每个种类的销售额排名前3的产品(分组topN,开窗函数row_number)
- row_number()开窗函数的作用:
其实就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号,比如说,有一个分组date=20151001,里面有3条数据,1122、1121、1124,那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号,行号从1开始递增,比如1122 1,1121 2,1124 3 - row_number()开窗函数的语法说明:
首先,可以在SELECT查询时,使用row_number()函数
其次,row_number()函数后面先跟上OVER关键字
然后,括号中是PARTITION BY,也就是说根据哪个字段进行分组,其次是可以用ORDER BY进行组内排序
最后row_number()就可以给每个组内的行,一个组内行号
sales.txt
Thin�Cell Phone�6000
Normal�Tablet�1500
Mini�Tablet�5500
UltraThin�Cell Phone�5000
VeryThin�Cell Phone�6000
Big�Tablet�2500
Bedable�Cell Phone�3000
Foldable�Cell Phone�3500
Pro�Tablet�4500
Pro2�Tablet�6500
package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
public class RowNumberWindowFunction {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("RowNumberWindowFunction");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
// 创建销售额表,sales表
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING,"
+ "revenue BIGINT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
+ "INTO TABLE sales");
// 开始编写我们的统计逻辑,使用row_number()开窗函数
DataFrame top3SalesDF = hiveContext.sql(""
+ "SELECT product,category,revenue "
+ "FROM ("
+ "SELECT "
+ "product,"
+ "category,"
+ "revenue,"
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank<=3");
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
top3SalesDF.saveAsTable("top3_sales");
sc.close();
}
}