from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.sql import Row
conf = SparkConf()
sc = SparkContext(conf=conf)
#设置错误级别
sc.setLogLevel("error")
sqlContext = SQLContext(sc)
def run(outfile):
origin_data = sc.textFile("filepath").map(lambda x: x.split("\t"))
first = origin_data.first()
# 过滤第一行
whole= origin_data.filter(lambda x: x != first)
course_order = whole.map(lambda x: (int(x[0]), int(x[1]), int(x[2]), int(x[3]))). \
filter(lambda x: int(x[3]) == 3). \
filter(lambda x: x[2] <= 100). \
filter(lambda x: x[0] != 0). \
map(lambda x: (int(x[0]), int(x[1]))). \
map(lambda x: Row(user_id=int(x[0]), num=int(x[1])))
out = sqlContext.createDataFrame(course_order).\
#分组
groupBy("user_id"). \
#聚合
agg({"num": "sum"}). \
#列重命名
withColumnRenamed("sum(num)", "num")
#csv file
out.repartition(1).write.format("csv").option("header", "false").mode("append").save(outfile)
#sql file
out.rdd.map(lambda x:sq % (x['user_id'] %10, x['num'], x['user_id'])).repartition(1).saveAsTextFile('sql.csv')
if __name__ == '__main__':
run("out")
spark常用操作
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 聚合函数combineByKey 将RDD[k,v]转化为RDD[k,c],利用该函数可以实现reduceByKe...
- RDD 操作二 常用的 Transformations 与 Actions 方法 原文地址: http://spa...
- DataSet 的函数 详细API常见此链接 Action 操作 1、collect() ,返回值是一个数组,返回...
- Transformations map,filterspark最长用的两个Transformations:map,...