DataFrame的去重,none值填充及异常值处理2018-05-23

spark 数据建模准备

去重

#初始化spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("shuangyu").getOrCreate()
df = spark.createDataFrame([(1,144.5,5.9,33,'M'),
                           (2,167.2,5.4,45,'M'),
                           (3,124.1,5.2,23,'F'),
                           (4,144.5,5.9,33,'M'),
                           (5,133.2,5.7,54,'F'),
                           (3,124.1,5.2,23,'F'),
                           (5,129.2,5.3,42,'M')],["id","weight","height","age","gender"])
#分别打印dataframe未去重和去重后的行数
print("count of rows: {}".format(df.count()))
print("count of distinct rows: {}".format(df.distinct().count()))

count of rows: 7
count of distinct rows: 6

#去掉重复的行
df = df.dropDuplicates()
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 5| 129.2| 5.3| 42| M|
| 1| 144.5| 5.9| 33| M|
| 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
+---+------+------+---+------+

#计算排除id后是否有重复的数据
print("counts of ids: {}".format(df.count()))
print("counts of distinct ids: {}".format(df.select([c for c in df.columns if c != "id"]).distinct().count()))

counts of ids: 6
counts of distinct ids: 5

#发现有2行出去ID外其它都是重复的,现在要去掉其中的一行
df = df.dropDuplicates(subset = [c for c in df.columns if c != "id"])
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+

#ok.现在来计算下是否有重复的ID
import pyspark.sql.functions as fn #导入spark sql的一些函数

df.agg(fn.count("id").alias("count"),
       fn.countDistinct("id").alias("distinct")).show()

+-----+--------+
|count|distinct|
+-----+--------+
| 5| 4|
+-----+--------+

#发现有重复的ID,我们可能需要重新给每行数据分分配唯一的新的ID来标示它们
df.withColumn("newId",fn.monotonically_increasing_id()).show()
#withColums 新增一列
#monotonically_increasing_id 生成唯一自增ID

+---+------+------+---+------+-------------+
| id|weight|height|age|gender| newId|
+---+------+------+---+------+-------------+
| 5| 133.2| 5.7| 54| F| 25769803776|
| 1| 144.5| 5.9| 33| M| 171798691840|
| 2| 167.2| 5.4| 45| M| 592705486848|
| 3| 124.1| 5.2| 23| F|1236950581248|
| 5| 129.2| 5.3| 42| M|1365799600128|
+---+------+------+---+------+-------------+

数据缺失

df_miss = spark.createDataFrame([(1,143.5,5.6,28,'M',10000),
                                (2,167.2,5.4,45,'M',None),
                                (3,None,5.2,None,None,None),
                                (4,144.5,5.9,33,'M',None),
                                (5,133.2,5.7,54,'F',None),
                                (6,124.1,5.2,None,'F',None),
                                (7,129.2,5.3,42,'M',76000)],
                               ['id','weight','height','age','gender','income'])
#统计每一行缺失的数据量
df_miss.rdd.map(lambda row: (row['id'],sum([c == None for c in row]))).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

#第三行数据缺失有点多,来看一下第三行数据
df_miss.where('id == 3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
| 3| null| 5.2|null| null| null|
+---+------+------+----+------+------+

#统计每列数据缺失情况
df_miss.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c + "_miss") for c in df_miss.columns]).show()

+-------+------------------+-----------+------------------+------------------+------------------+
|id_miss| weight_miss|height_miss| age_miss| gender_miss| income_miss|
+-------+------------------+-----------+------------------+------------------+------------------+
| 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+-------+------------------+-----------+------------------+------------------+------------------+

#income列缺失太多,基本无用了,现在要去掉这一列数据
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != "income"])
df_miss_no_income.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 3| null| 5.2|null| null|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2|null| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+----+------+

#某些行缺失的数据也比较多,现在去除掉这些行
#thresh=3 表示一行中非NONE的数据少于3个则去除该行
df_miss_no_income.dropna(thresh=3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2|null| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+----+------+

#只要含有NONE则去除该行
df_miss_no_income.dropna().show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 7| 129.2| 5.3| 42| M|
+---+------+------+---+------+

#为none值填充新值
means = df_miss_no_income.agg(*[fn.mean(c).alias(c) 
                                for c in df_miss_no_income.columns if c != 'gender'])\
                                .toPandas().to_dict('records')[0]
means['gender'] = "missing"
print(means)
#df.fillna(dict) 填充df中的none值,dict中以各个col字段作为key,要填充的值作为value 
df_miss_no_income.fillna(means).show()

{'age': 40.4, 'height': 5.471428571428571, 'gender': 'missing', 'weight': 140.28333333333333, 'id': 4.0}
+---+------------------+------+---+-------+
| id| weight|height|age| gender|
+---+------------------+------+---+-------+
| 1| 143.5| 5.6| 28| M|
| 2| 167.2| 5.4| 45| M|
| 3|140.28333333333333| 5.2| 40|missing|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 6| 124.1| 5.2| 40| F|
| 7| 129.2| 5.3| 42| M|
+---+------------------+------+---+-------+

异常值

df_outliers = spark.createDataFrame([(1,143.5,5.3,28),
                                    (2,154.2,5.5,45),
                                    (3,342.3,5.1,99),
                                    (4,144.5,5.5,33),
                                    (5,133.2,5.4,54),
                                    (6,124.1,5.1,21),
                                    (7,129.2,5.3,42)],["id","weight","height","age"])
cols = ["weight","height","age"]
#bounds,用来存储后面生成的各个字段值的边界
bounds = {}
for col in cols:
    #涉及统计中的4分位。计算Q1和Q3
    quantiles = df_outliers.approxQuantile(col, [0.25,0.75], 0.05)
    #计算4分位距
    IQR = quantiles[1] - quantiles[0]
    #计算内限
    bounds[col] = [quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR]
    
print("bounds: ",bounds)
#判断是否为异常值,在内限之外的值为异常值
outliers = df_outliers.select(*['id'] + \
                              [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) )\
                               .alias(c +"_o") for c in cols])
outliers.show()

bounds: {'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
| 1| false| false|false|
| 2| false| false|false|
| 3| true| false| true|
| 4| false| false|false|
| 5| false| false|false|
| 6| false| false|false|
| 7| false| false|false|
+---+--------+--------+-----+

#查询出异常值
df_outliers = df_outliers.join(outliers,on = 'id')
#上面的join语句不要写成 df_outliers.join(outliers, df_outliers.id == outliers.id) 否则在
#新生成的 df_outliers中会有2列id,后面在select时会报错AnalysisException: "Reference 'id' is ambiguous
df_outliers.show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_o|height_o|age_o|
+---+------+------+---+--------+--------+-----+
| 7| 129.2| 5.3| 42| false| false|false|
| 6| 124.1| 5.1| 21| false| false|false|
| 5| 133.2| 5.4| 54| false| false|false|
| 1| 143.5| 5.3| 28| false| false|false|
| 3| 342.3| 5.1| 99| true| false| true|
| 2| 154.2| 5.5| 45| false| false|false|
| 4| 144.5| 5.5| 33| false| false|false|
+---+------+------+---+--------+--------+-----+

df_outliers.filter('weight_o').select('id','weight').show()

+---+------+
| id|weight|
+---+------+
| 3| 342.3|
+---+------+

df_outliers.filter("age_o").select("id","age").show()

+---+---+
| id|age|
+---+---+
| 3| 99|
+---+---+

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容