Spark SQL中所有功能的入口点是 SparkSession
类
可以利用创建rdd的SparkContext.SparkConf
创建
pyspark中的api是:
from pyspark import SparkSession, SparkConf
SparkSession.getOrCreate(SparkConf)
将rdd转换到dataframe时 需要给rdd中的数据指定schema
from pyspark.sql.types import StructField, StructType, StringType, TimestampType, IntegerType, FloatType
Schema = StructType([
StructField("hostID", IntegerType(), True),
StructField("uid", StringType(), True),
StructField("ts", IntegerType(), True),
StructField("price", FloatType(), True),
StructField("nickname", StringType(), True)
])
df = SparkSession.createDataFrame(rdd, schema=douyuGiftSchema)
df.createOrReplaceTempView("table1")
指定schema后将rdd映射在dataframe中,createOrReplaceTempView函数利用dataframe创建一个tmpView,类似关系型数据库的表,只在同一个连接(SparkSession)中可见。
result = SparkSession.sql("SELECT hostID,count(distinct(uid)) AND price >0 FROM table1 GROUP BY hostID")
在这个SparkSession中可以执行sql查询,指定table为已创建的tmpView,返回一个结果集。