1.读取csv
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
file = r'C:\Users\Administrator\Desktop\kaggle泰坦尼克号获救率预测数据集\train.csv'
df = spark.read.csv(file,header=True,inferSchema=True)
df.show(5)
2. 查看字段类型 同pandas
df.dtypes
[('PassengerId', 'int'),
('Survived', 'int'),
('Pclass', 'int'),
('Name', 'string'),
('Sex', 'string'),
('Age', 'double'),
('SibSp', 'int'),
('Parch', 'int'),
('Ticket', 'string'),
('Fare', 'double'),
('Cabin', 'string'),
('Embarked', 'string')]
3. 查看列名 同pandas
df.columns
['PassengerId',
'Survived',
'Pclass',
'Name',
'Sex',
'Age',
'SibSp',
'Parch',
'Ticket',
'Fare',
'Cabin',
'Embarked']
4. 查看行数 pandas len(df)
df.count()
891
5. 重命名列名
pandas df.rename(columns={'Sex':sex})
#withColumnRenamed方法
df = df.withColumnRenamed('Age','age')\
.withColumnRenamed('Sex','sex')
df.columns
6.选择和切片
6.1 选择一列
df.select('Age').show(2)
6.2 选择多列
df.select('Name','Age').show(2)
6.3 多列选择和切片
df.select('Name','Age')\
.filter(df['Age']>70).show()
6.4 between 范围选择
df.filter(df['Age'].between(68,72))\
.select('Name','Age').show()
6.5 联合筛选
df.filter(df['Age']>30)\
.filter(df['Sex']=='male')\
.select('Name','Sex','Age').show()
6.6 like 包含文字信息
df.filter("Name like '%Mrs%'").show()
6.7 sql方式选择
# 首先dataframe注册为临时表,然后执行SQL查询
df.createOrReplaceTempView('df_sql')
spark.sql('select Name,Age,Sex from df_sql where Age>30 and Age<35').show()
7. 删除某行
df.drop('PassengerId').show()
8. 增加某列
import pyspark.sql.functions as fn
df.withColumn('id',fn.lit(0)).show()
9. 排序
#pandas 排序
#df.sort_values()
#排序
df.sort('age',ascending=False).show()#ascending=False 倒序
#多列排序
df.sort('age','fare',ascending=False).show()
#混合排序
df.sort(df['Age'].desc(),df['Fare'].asc()).show()#age字段倒序,fare字段正序
# orderBy也是排序,返回的Row对象列表
df.orderBy('age','fare').take(3)
10. 重复值
#查看是否有重复项
print('去重前行数',df.count())
print('去重后行数',df.distinct().count())
#去除重复行,同pandas
df.drop_duplicates()
去重前行数 891
去重后行数 891
11. 缺失值
#查看各字段缺失率
df.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c) for c in df.columns]).show()
#其中age,Cabin,Embarked字段有缺失
#Cabin 缺失率较大删除该字段
df = df.drop('cabin')
#age字段缺失率20%,填充均值
agg_mean = round(df.select(fn.mean('age')).collect()[0][0],0)
#Embarked缺失率较少,填充众数
#查看Embarked字段各分类总计
df.groupby('embarked').count().sort('count',ascending=False).show()
df = df.fillna({'age':agg_mean,'embarked':'S'})
#去除缺失值大于阈值的行
#df.drop(thresh=)
#姓名为字符型,从字符变量中提取有效信息
str1 = df.select(fn.split('name',',')[1].alias('name'))
str2 = str1.select(fn.split('name','. ')[0].alias('name'))