一、常用计算方法
二、时间相关
三、数组类型操作
四、数据处理
五、编码与进制
六、from解析
七、字符串操作
八、字典操作
九、窗口函数
十、其它操作
pandas_udf(f = None,returnType = None,functionType = None )
pandas udf接口,可直接使用该方法定义的pandas_udf进行pandas的一些操作而不用toPandas。
Pandas UDF是用户定义的函数,由Spark使用Arrow来传输数据,并通过Pandas与数据一起使用来执行,从而可以进行矢量化操作。使用pandas_udf作为装饰器或包装函数来定义Pandas UDF ,并且不需要其他配置。Pandas UDF通常表现为常规的PySpark函数API。
用法
import pandas as pd
from pyspark.sql.functions import pandas_udf
#字符串大写
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()
df = spark.createDataFrame([("john doe",)], ("name",))
df.select(to_upper("name")).show()
udf(f = None,returnType = StringType )
自定义用户函数。
一、常用计算方法:
abs(col)
计算绝对值
exp(col)
计算指数
factorial(col)
计算阶乘
kurtosis(col)
返回组中值的峰度。
skewness(col)
返回组中值的偏度。
sqrt(col)
计算指定浮点值的平方根。
sum(col)
返回表达式中所有值的总和。
sumDistinct(col)
返回表达式中不同值的总和。
avg(col)
计算平均值
corr(col1,col2)
计算两列的皮尔逊相关系数。
count(col)
统计组中个数
countDistinct(col)
统计组中去重后的个数
stddev_pop(col)
返回一组表达式的总体标准偏差。
stddev_samp(col), stddev(col)
返回一组表达式的无偏样本标准差。
var_pop(col)
返回组中值的总体方差。
var_samp(col),variance(col)
返回组中值的无偏样本方差
covar_pop(col1,col2)
计算两列的总体协方差
covar_samp(col1,col2)
计算两列的样本协方差
log(arg1,arg2 = None)
返回第二个参数的第一个基于参数的对数。
如果只有一个参数,则采用参数的自然对数。
log10(col)
计算以10为底的给定值的对数。
log1p(col)
计算给定值加一的自然对数。
log2(col)
返回参数的以2为底的对数。
hypot(a,b)
计算sqrt(a^2 + b^2),无中间上溢或下溢
size(col)
返回存储在列中的数组或映射的长度。
max(col)
计算最大值
mean(col)
计算均值
min(col)
计算最小值
pow(col1,col2)
计算幂值
sin(col)
计算正弦值
sinh
计算双曲正弦值
cos(col)
计算余弦值
cosh(col)
计算双曲余弦值
tan(col)
计算正切值
tanh(col)
计算双曲正切值
acos(col)
计算反余弦值
asin(col)
计算反正弦值
atan(col)
计算反正切值
atan2(col1,col2)
返回以弧度表示的 y/x 的反正切
degrees(col)
将以弧度为单位的角度转换为以度为单位的近似等效角度。
radians(col)
将以度为单位的角度转换为以弧度为单位的近似等效角度。
rand(seed=None)
生成一个随机列,该列具有以[0.0,1.0)均匀分布的独立且均匀分布的(iid)样本。
randn(seed=None)
从标准正态分布生成具有独立且均匀分布(iid)样本的列。
rint(col)
返回值最接近该数且等于数学整数的double值。
round(col,scale=0)
四舍五入
bround(col,scale)
四舍五入,有点问题,2.5和3.5一个舍一个入
spark.createDataFrame([(2.5,)], ['a']).select(F.bround('a').alias('r')).show()
+---+
| r|
+---+
|2.0|
+---+
spark.createDataFrame([(3.5,)], ['a']).select(F.bround('a').alias('r')).show()
+---+
| r|
+---+
|4.0|
+---+
ceil(col)
直接四舍五入向上收
floor(col)
直接四舍五入向下收
cbrt(col)
计算给定值的立方根
signum(col)
计算给定值的符号。
二、时间相关:
add_months(col,months)
返回当前时间往后几个月的时间
import pyspark.sql.functions as F
df = spark.createDataFrame([('2021-02-02',),('2020-12-31',)], ['dt'])
df.select(F.add_months(df.dt, 3).alias('next_month')).show()
out:
+----------+
|next_month|
+----------+
|2021-05-02|
|2021-03-31|
+----------+
current_date
以DateType列的形式返回当前日期
current_timestamp
以timestamp列的形式返回当前日期
date_add
返回开始后几天的日期
date_format
timestamp
date_sub
返回开始前几天的日期
date_trunc
返回时间戳,该时间戳将被截断为格式指定的单位
datediff
返回从开始到结束的天数。
dayofmonth
返回该日期为月中几号。
dayofweek
返回该日期为周几。
weekofyear(col)
将给定日期的星期数提取为整数。
dayofyear
返回该日期为一年中第几天。
second(col)
将给定日期的秒数提取为整数。
minute(col)
将给定日期的分钟数提取为整数。
hour(col)
将给定日期的小时数提取为整数
month(col)
将给定日期的月份提取为整数。
year(col)
提取给定日期的年份为整数。
quarter(col)
将给定日期的四分之一提取为整数。
last_day(col)
返回给定日期所属月份的最后一天。
months_between(date1,date2,roundOff = True)
返回日期date1和date2之间的月数。如果date1晚于date2,则结果为正。如果date1和date2在一个月的同一天,或者都在一个月的最后一天,则返回一个整数(一天中的时间将被忽略)。除非将roundOff设置为False,否则结果将四舍五入为8位数字。
next_day(date,dayofweek)
返回date后的第一个日期。
星期几参数不区分大小写,并接受:
[“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”]
df = spark.createDataFrame([('2020-02-02',)], ['d'])
df.select(F.next_day(df.d, 'Sun').alias('date')).collect()
Out[62]: [Row(date=datetime.date(2020, 2, 9))]
trunc(date,format)
返回截断为格式指定单位的日期。
- format – ‘year’, ‘yyyy’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
df = spark.createDataFrame([('1997-02-28',)], ['d'])
df.select(F.trunc(df.d, 'year').alias('year')).collect()
[Row(year=datetime.date(1997, 1, 1))]
df.select(F.trunc(df.d, 'mon').alias('month')).collect()
[Row(month=datetime.date(1997, 2, 1))]
unix_timestamp(timestamp = None,format ='yyyy-MM-dd HH:mm:ss' )
使用默认时区和默认区域设置,将具有给定模式(默认为'yyyy-MM-dd HH:mm:ss')的时间字符串转换为Unix时间戳(以秒为单位),如果失败,则返回null。
如果timestamp为None,则返回当前时间戳。
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
time_df = spark.createDataFrame([('2015-04-08',)], ['dt'])
time_df.select(F.unix_timestamp('dt', 'yyyy-MM-dd').alias('unix_time')).collect()
Out[93]: [Row(unix_time=1428476400)]
三、数组类型操作
以下带array的用法都是针对列中元素是 数组[]的情况。
array(*cols)
将所有传入的列生成一个新的array数组。
array_contains(col,value)
判断元素是否在数组中,如果数组为null,则返回null;如果数组包含给定值,则返回true,否则返回false
df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.show()
df.select(F.array_contains(df.data, "a")).show()
+---------+
| data|
+---------+
|[a, b, c]|
| []|
+---------+
+-----------------------+
|array_contains(data, a)|
+-----------------------+
| true|
| false|
+-----------------------+
array_distinct(col)
从数组中删除重复的值。
array_except(col1,col2)
返回col1中独有的元素且去重,但不返回col2中的元素。
from pyspark.sql import Row
df = spark.createDataFrame([Row(c1=["b","b","a","c"], c2=["c", "d", "a", "f"])])
df.show()
df.select(F.array_except(df.c1, df.c2)).show()
+------------+------------+
| c1| c2|
+------------+------------+
|[b, b, a, c]|[c, d, a, f]|
+------------+------------+
+--------------------+
|array_except(c1, c2)|
+--------------------+
| [b]|
+--------------------+
array_intersect(col1,col2)
返回col1和col2的交集且去重。
array_union(col1,col2)
返回col1和col2的并集数组并去重
array_join(col, delimiter, null_replacement=None)
根据分隔符将数组变为字符串
参数:delimiter ---分隔符,null_replacement ---设置后会将空值替换为它
df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data'])
df.show()
df.select(F.array_join(df.data, ",").alias("joined")).show()
df.select(F.array_join(df.data, ",", "NULL").alias("joined")).show()
+---------+
| data|
+---------+
|[a, b, c]|
| [a,]|
+---------+
+------+
|joined|
+------+
| a,b,c|
| a|
+------+
+------+
|joined|
+------+
| a,b,c|
|a,NULL|
+------+
array_max(col)
返回数组的最大值
array_min(col)
返回数组的最小值
array_position(col,value)
返回数组中指定值第一次出现的索引
注意:该位置不是基于零的索引,而是基于1的索引。如果在数组中找不到给定值,则返回0
df = spark.createDataFrame([(["c", "b", "a"],), ([],),([None,"a"],)], ['data'])
df.select(F.array_position(df.data, "a")).show()
+-----------------------+
|array_position(data, a)|
+-----------------------+
| 3|
| 0|
| 2|
+-----------------------+
array_remove(col,value)
删除数组中所有等于value的元素
array_repeat(col,count)
创建一个原始值*count倍的数组,原始值可以是直接值,也可以是数组。
df = spark.createDataFrame([('a',),('b',)], ['data'])
df.select(F.array_repeat(df.data, 3).alias('r')).show()
df = spark.createDataFrame([(["a","b"],), (["a"],)], ['data'])
df.select(F.array_repeat(df.data, 3)).show()
+---------+
| r|
+---------+
|[a, a, a]|
|[b, b, b]|
+---------+
+---------------------+
|array_repeat(data, 3)|
+---------------------+
| [[a, b], [a, b], ...|
| [[a], [a], [a]]|
+---------------------+
array_sort(col)
以升序对输入数组进行排序。输入数组的元素必须是可排序的。空元素将放置在返回数组的末尾
arrays_overlap(col1,col2)
如果col1至少包含一个col22中也存在的非空元素,则返回true。如果数组没有公共元素,并且它们都是非空的,并且其中任何一个包含空元素,则返回null,否则返回false。
df = spark.createDataFrame([(["a", "b"], ["b", "c"]), (["a"], ["b", "c"]),(["a"],None)], ['x', 'y'])
df.show()
df.select(F.arrays_overlap(df.x, df.y).alias("overlap")).show()
+------+------+
| x| y|
+------+------+
|[a, b]|[b, c]|
| [a]|[b, c]|
| [a]| null|
+------+------+
+-------+
|overlap|
+-------+
| true|
| false|
| null|
+-------+
arrays_zip(*cols)
返回所有传入数组列的合并数组
from pyspark.sql.functions import arrays_zip
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4],[5,5]))], ['vals1', 'vals2','vals3'])
df.show()
df.select(F.arrays_zip(df.vals1,df.vals2,df.vals3).alias('zipped')).show()
+---------+---------+---------+
| vals1| vals2| vals3|
+---------+---------+---------+
|[1, 2, 3]|[2, 3, 4]|[5, 5]|
+---------+---------+---------+
+--------------------+
| zipped|
+--------------------+
|[[1, 2, 5], [2, 3, 5], [3, 4, None]]|
+--------------------+
greatest(*cols)
返回选中列中值的最大值。
df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect()
[Row(greatest=4)]
least(*cols)
和上面一样用法返回选中列中值的最小值。
last(col,ignoreNulls)
返回组中的最后一个值。
默认情况下,该函数返回其看到的最后一个值。当将ignoreNulls设置为true时,它将返回它看到的最后一个非null值。如果所有值都为null,则返回null。
shuffle(col)
生成给定数组的随机排列。
slice(x,start,len)
切片,以指定的长度返回一个数组,该数组包含从索引开头 (数组索引从1开始,如果start为负,则从结尾开始)中包含x中所有元素的数组。
sort_array(col,asc=True)
根据数组元素的自然顺序对输入数组进行升序或降序排序。空元素将以升序放置在返回数组的开头,或者以降序放置在返回数组的结尾。
四、数据处理
approx_count_distinct(col, rsd=None)
近似估算去重统计函数,原理是hyperloglog算法,能很快的估算出数据的近似量。
参数:rsd –允许的最大相对标准偏差(默认= 0.05)
first(col,ignoreNulls)
默认情况下,该函数返回它看到的第一个值。当ignoreNulls设置为true时,它将返回它看到的第一个非空值。如果所有值都为null,则返回null。
- 注意:该函数是不确定的,因为其结果取决于行的顺序,这些顺序在洗牌后可能是不确定的。
flatten(col)
降维,将多维数组拉平,如果嵌套数组的结构深于两层,则仅除去一层嵌套。
asc(col)
根据给定列名的升序返回排序表达式
desc(col)
根据给定列名的降序返回排序表达式。
asc_nulls_first(col)
根据给定列名的升序返回排序表达式,并且null值在非null值之前返回。
asc_nulls_last(col)
根据给定列名的升序返回排序表达式,并且null值出现在非null值之后。
desc_nulls_first(col)
根据给定列名的降序返回排序表达式,并且null值在非null值之前返回。
desc_nulls_last(col)
根据给定列名的降序返回排序表达式,并且null值出现在非null值之后。
coalesce(col1,col2)
返回两列不为空的值,如果两列都有值,则只返回第一列的值。
cDf = spark.createDataFrame([(None, None), (1, 2), (None, 2)], ("a", "b"))
cDf.show()
+----+----+
| a| b|
+----+----+
|null|null|
| 1|2|
|null| 2|
+----+----+
cDf.select(F.coalesce(cDf["a"], cDf["b"])).show()
+--------------+
|coalesce(a, b)|
+--------------+
| null|
| 1|
| 2|
+--------------+
collect_list(col)
返回该列值的列表,含重复值。
df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
df2.agg(F.collect_list('age')).show()
+-----------------+
|collect_list(age)|
+-----------------+
| [2, 5, 5]|
+-----------------+
collect_set(col)
返回该列值的集合,无重复值。
concat(*cols)
将选定列拼接成一列,兼容数组,字符串等,如果有缺失值则拼接后为空。
df = spark.createDataFrame([([1, 2], [3, 4], [5]), ([1, 2], None, [3])], ['a', 'b', 'c'])
df.show()
df.select(F.concat(df.a, df.b, df.c).alias("arr")).show()
+------+------+---+
| a| b| c|
+------+------+---+
|[1, 2]|[3, 4]|[5]|
|[1, 2]| null|[3]|
+------+------+---+
+---------------+
| arr|
+---------------+
|[1, 2, 3, 4, 5]|
| null|
+---------------+
concat_ws(sep,*cols)
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。
df = spark.createDataFrame([('abcd','123','hhh')], ['s', 'd','b'])
df.select(F.concat_ws('-', df.s, df.d, df.b).alias('s')).show()
+------------+
| s|
+------------+
|abcd-123-hhh|
+------------+
explode(col)
拆分数组列或字典列,去掉缺失值。
explode_outer(col)
拆分数组列或字典列,保留缺失值,数组按行加,字典按列加。
posexplode(col)
为每个元素在给定数组或地图中的位置返回一个带索引的新行。数组纵向扩展,字典横向扩展。
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.show()
eDF.select(F.posexplode(eDF.intlist)).show()
eDF.select(F.posexplode(eDF.mapfield)).show()
+---+---------+--------+
| a| intlist|mapfield|
+---+---------+--------+
| 1|[1, 2, 3]|[a -> b]|
+---+---------+--------+
+---+---+
|pos|col|
+---+---+
| 0| 1|
| 1| 2|
| 2| 3|
+---+---+
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| b|
+---+---+-----+
posexplode_outer
为每个元素在给定数组或地图中的位置返回一个带索引新行。与posexplode不同,如果数组/映射为null或为空,则将生成行(null,null)。
expm1
计算给定值减去一的指数。
expr(str)
传入一个操作字符串,然后转成python代码执行,就像python的eval一样。
df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.show()
df.select(F.expr("count(data)")).show()
+---------+
| data|
+---------+
|[a, b, c]|
| []|
+---------+
+-----------+
|count(data)|
+-----------+
| 2|
+-----------+
format_number(col,d)
格式化数字,比如给数字添加1几位小数保留形式
format_string(format,*cols)
以printf样式设置参数格式,并将结果作为字符串列返回。
df = spark.createDataFrame([(5, "hello")], ['a', 'b'])
df.select(F.format_string('%d %s', df.a, df.b).alias('v')).show()
+-------+
| v|
+-------+
|5 hello|
+-------+
element_at(col,index)
返回选中列的索引对应的值,如果col中数据类型是数组则对应索引,如果是字典则对应键值。
isnan(col)
如果该列为NaN,则返回true的表达式。
isnull(col)
如果列为空,则返回true的表达式。
nanvl(col1,col2)
如果不是NaN,则返回col1;如果col1是NaN,则返回col2。
- 注意:两个输入都应为浮点列(DoubleType或FloatType)。
when(condition, value)
计算条件列表,并返回多个可能的结果表达式之一。如果Column.otherwise()未调用,则为不匹配的条件返回None。
- condition–布尔Column表达式。
- value–文字值或Column表达式。
df.show()
+---+---+
| C1| C2|
+---+---+
| 1| 2|
| 3| 5|
| 5| 6|
+---+---+
df.withColumn('when', F.when(df['C1'] < 2, 0).otherwise(1)).show()
+---+---+----+
| C1| C2|when|
+---+---+----+
| 1| 2| 0|
| 3| 5| 1|
| 5| 6| 1|
+---+---+----+
五、编码与进制
ascii(col)
计算字符串列第一个字符的ascll码值,注意"ab"只返回"a"的ascll码值
base64
计算二进制列的BASE64编码,并将其作为字符串列返回,用于字符串类型
unbase64(col)
解码BASE64编码的字符串列,并将其作为二进制列返回。
bin(col)
计算数值的二进制值,返回字符串形式,用于数值类型
bitwiseNOT(col)
对二进制数据进行非操作, 原值+现值=-1
md5(col)
计算MD5摘要,并以32个字符的十六进制字符串形式返回值。
conv(col,frombase,tobase)
进制转换
参数:frombase为转换前进制,tobase为转换需要成的进制。
df = spark.createDataFrame([("010101",)], ['n'])
...: df.select(F.conv(df.n, 2, 8).alias('hex')).collect()
Out[36]: [Row(hex='25')]
df = spark.createDataFrame([("010101",)], ['n'])
...: df.select(F.conv(df.n, 2, 10).alias('hex')).collect()
Out[37]: [Row(hex='21')]
crc32(col)
计算二进制列的循环冗余校验值(CRC32),并将该值作为bigint返回
decode(col,charset)
修改编码,可设置(“ US-ASCII”,“ ISO-8859-1”,“ UTF-8”,“ UTF-16BE”,“ UTF-16LE”,“ 'UTF-16')
encode(col,charset)
使用提供的字符集(“ US-ASCII”,“ ISO-8859-1”,“ UTF-8”,“ UTF-16BE”,“ UTF-16LE”,“ 'UTF-16'
hash(col)
计算给定列的哈希码,并将结果作为int列返回。
unhex(col)
十六进制的倒数。将每对字符解释为十六进制数字,并转换为数字的字节表示形式。
xxhash64(*cols)
使用xxHash算法的64位变体计算给定列的哈希码,并将结果作为长列返回。
sha1(col)
返回SHA-1的十六进制字符串结果。
sha2(col,numBits)
返回SHA-2系列哈希函数(SHA-224,SHA-256,SHA-384和SHA-512)的十六进制字符串结果。numBits指示结果的所需位长,该位的值必须为224、256、384、512或0(等于256)。
shiftLeft(col,numBits)
将给定值numBits左移,其实就是将二进制编码向左移。
ep: 21的二进制为10101,左移一位为101010=42
spark.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]
shiftRight(col,numBits)
将给定值numBits左移,其实就是将二进制编码向右移。
shiftRightUnsigned(col,numBits)
无符号将给定值numBits右移。
六、from解析
from_csv(col,schema,options = {} )
将包含CSV字符串的列解析为具有指定架构的行。如果是不可解析的字符串,则返回null。
- col – CSV格式的字符串列
- schema –具有DDL格式的架构的字符串,在解析CSV列时使用。
- options –控制解析的选项。接受与CSV数据源相同的选项
data = [("1,2,3",)]
df = spark.createDataFrame(data, ("value",))
df.select(F.from_csv(df.value, "a INT, b INT, c INT").alias("csv")).collect()
[Row(csv=Row(a=1, b=2, c=3))]
value = data[0][0]
df.select(F.from_csv(df.value, F.schema_of_csv(value)).alias("csv")).collect()
[Row(csv=Row(_c0=1, _c1=2, _c2=3))]
data = [(" abc",)]
df = spark.createDataFrame(data, ("value",))
options = {'ignoreLeadingWhiteSpace': True}
df.select(F.from_csv(df.value, "s string", options).alias("csv")).collect()
[Row(csv=Row(s='abc'))]
from_json(col,schema,options = {} )
将包含JSON字符串的列解析为MapType带有StringType as键类型StructType或ArrayType指定架构的。如果是不可解析的字符串,则返回null。
- col – json格式的字符串列
- schema –解析json列时使用的StructType或StructType的ArrayType。
- options –控制解析的选项。接受与json数据源相同的选项
from_unixtime(timestamp,format ='yyyy-MM-dd HH:mm:ss'
将unix纪元(1970-01-01 00:00:00 UTC)的秒数转换为表示给定格式的字符串,该字符串表示当前时间在当前系统时区中的时间戳。)
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
time_df = spark.createDataFrame([(1428476400,)], ['unix_time'])
time_df.select(F.from_unixtime('unix_time').alias('ts')).collect()
[Row(ts='2015-04-08 00:00:00')]
from_utc_timestamp(timestamp,tz)
将时间戳值移到另一个时区里。
- timestamp戳–包含时间戳的列
- tz –一个字符串,详细说明应将输入调整为的时区ID。它应采用基于区域的区域ID或区域偏移量的格式。区域ID的格式必须为“区域/城市”,例如“ America / Los_Angeles”。区域偏移量必须采用“(+ |-)HH:mm”格式,例如“ -08:00”或“ +01:00”。还支持'UTC'和'Z'作为'+00:00'的别名。不建议使用其他短名称,因为它们可能不明确。
get_json_object(col,path)
根据指定的json路径从json字符串中提取json对象,并返回提取的json对象的json字符串。如果输入的json字符串无效,它将返回null。
- col – json格式的字符串列
- path –要提取的json对象的路径
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark.createDataFrame(data, ("key", "jstring"))
df.show()
df.select(df.key, F.get_json_object(df.jstring, '$.f1').alias("c0"), \
F.get_json_object(df.jstring, '$.f2').alias("c1") ).show()
+---+--------------------+
|key| jstring|
+---+--------------------+
| 1|{"f1": "value1", ...|
| 2| {"f1": "value12"}|
+---+--------------------+
+---+-------+------+
|key| c0| c1|
+---+-------+------+
| 1| value1|value2|
| 2|value12| null|
+---+-------+------+
七、字符串操作
initcap(col)
将每个单词的第一个字母变为大写
input_file_name
为当前Spark任务的文件名创建一个字符串列。
instr(str,substr)
在给定的字符串中找到第一个出现的substr列的位置。如果任何一个参数为null,则返回null。
locate(substr,str,pos = 1)
在位置pos之后,在字符串列中找到第一个出现的substr的位置。
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark.createDataFrame(data, ("key", "jstring"))
df.show()
df.select(df.key, F.json_tuple(df.jstring, 'f1', 'f2')).show()
+---+--------------------+
|key| jstring|
+---+--------------------+
| 1|{"f1": "value1", ...|
| 2| {"f1": "value12"}|
+---+--------------------+
+---+-------+------+
|key| c0| c1|
+---+-------+------+
| 1| value1|value2|
| 2|value12| null|
+---+-------+------+
length(col)
计算字符串数据的字符长度或二进制数据的字节数。字符数据的长度包括尾随空格。二进制数据的长度包括二进制零。
levenshtein(col1,col2)
计算两个给定字符串的Levenshtein距离。
lower(col)
将字符串表达式转换为小写。
upper(col)
将字符串表达式转换为大写。
lpad(col,len,pad)
从左边开始填充字符串,保证长度为len,pad为填充符。
df = spark.createDataFrame([('abcd',)], ['s',])
df.select(F.lpad(df.s, 6, '#').alias('s')).collect()
[Row(s='##abcd')]
rpad(col,len,pad)
从右边开始填充字符串,保证长度为len,pad为填充符。
ltrim(col)
修剪左端的空格以获取指定的字符串值。
rtrim(col)
修剪右端端的空格以获取指定的字符串值。
trim(col)
修剪指定字符串列两端的空格。
overlay(src,replace,pos,len = -1 )
从给定位置开始pos开始使用replace字符串替换src中的字符串内容,被替换字符串长度=len,-1表示后面全部替换。
df = spark.createDataFrame([("SPARK_SQL", "CORE")], ("x", "y"))
df.select(F.overlay(src="x", replace="y", pos=7,len=-1).alias("overlayed")).show()
df.select(F.overlay(src="x", replace="y", pos=7,len=1).alias("overlayed")).show()
+----------+
| overlayed|
+----------+
|SPARK_CORE|
+----------+
+------------+
| overlayed|
+------------+
|SPARK_COREQL|
+------------+
regexp_extract(str,pattern,idx)
从指定的字符串列中提取与Java正则表达式匹配的特定组。如果正则表达式不匹配,或者指定的组不匹配,则返回一个空字符串。
regexp_replace(str,pattern,replacement )
将与regexp匹配的指定字符串值的所有子字符串替换为rep。
repeat(col,n)
重复一次字符串列n次,并将其作为新的字符串列返回
reverse(col)
返回反向字符串或元素顺序相反的数组。
soundex(col)
返回字符串的SoundEx编码。
split(str,pattern,limit = -1)
在给定模式的匹配项周围拆分str。
- str –要拆分的字符串表达式
- pattern –代表正则表达式的字符串。regex字符串应为Java正则表达式。
- limit–一个整数,控制应用图案的次数。
limit > 0:结果数组的长度将不超过limit,并且
结果数组的最后一个条目将包含除最后一个匹配模式以外的所有输入。
limit <= 0:模式将被尽可能多地应用,并且结果
数组可以是任何大小。
df = spark.createDataFrame([('oneAtwoBthreeC',)], ['s',])
df.select(F.split(df.s, '[ABC]', 2).alias('s')).collect()
[Row(s=['one', 'twoBthreeC'])]
df.select(F.split(df.s, '[ABC]', -1).alias('s')).collect()
[Row(s=['one', 'two', 'three', ''])]
substring_index(str,delim,count )
返回出现count个定界符delim之前的所有内容,如果count为负,则从右边往左数,且返回右侧所有内容。
df = spark.createDataFrame([('a.b.c.d',)], ['s'])
df.select(substring_index(df.s, '.', 2).alias('s')).collect()
[Row(s='a.b')]
df.select(substring_index(df.s, '.', -3).alias('s')).collect()
[Row(s='b.c.d')]
八、字典操作
map_concat(*cols)
拼接字典列。
map_entries(col)
将字典转为无序数组。
map_from_arrays(col1,col2)
从两个数组创建一个新地图。
- col1 –包含一组键的列的名称。所有元素都不应为null
- col2 –包含一组值的列名
map_from_entries(col)
将无序数组转为字典
map_keys(col)
返回字典键的无序数组。
map_values(col)
返回字典值的无序数组
create_map(cols)
创建一个字典,列表或Column以键值对分组的表达式列表,例如(key1,value1,key2,value2等)。
九、窗口函数
window(timeColumn,windowDuration,slideDuration = None,startTime = None )
给定时间戳记指定列,将行存储到一个或多个时间窗口中。窗口开始是包含的,但窗口结束是排斥的,例如12:05将在窗口[12:05,12:10)中,但不在[12:00,12:05)中。Windows可以支持微秒精度。不支持数月之内的Windows。
持续时间以字符串形式提供,例如“ 1秒”,“ 1天12小时”,“ 2分钟”。有效的时间间隔字符串是“星期”,“天”,“小时”,“分钟”,“秒”,“毫秒”,“微秒”。如果slideDuration未提供,则窗口将是滚动窗口。
startTime是相对于1970-01-01 00:00:00 UTC的偏移量,使用该偏移量可以启动窗口间隔。例如,为了使每小时的滚动窗口在每小时的15分钟之后开始,例如12:15-13:15、13:15-14:15…提供startTime为15分钟。
默认情况下,输出列为名为“ window”的结构,嵌套列为“ start”和“ end”。
cume_dist
返回窗口分区内值的累积分布,即当前行之下的行的分数。
dense_rank
返回窗口分区内的行的级别,没有任何间隔。
rank和density_rank之间的区别在于,当有联系时,densed_rank在排序序列中不留任何空隙。也就是说,如果您使用density_rank对比赛进行排名,并且有3个人并列第二,那么您将说所有3个人都排在第二位,而下一个人则排在第三位。排名会给我连续的数字,使得排在第三位的人(并列之后)将排在第五位。
这等效于SQL中的DENSE_RANK函数。
lag(col,offset = 1,default = None)
返回当前行之前偏移行的值,如果当前行之前偏移行少于行, 则返回defaultValue。例如,偏移量为1将返回窗口分区中任意给定点的上一行。
这等效于SQL中的LAG函数。
- col –列或表达式的名称
- offse-要扩展的行数
- default–默认值
lead(col,offset = 1,default = None)
返回当前行之后的偏移行 的值,如果当前行之后的偏移行少于此值,则返回defaultValue。例如,偏移量为1将返回窗口分区中任意给定点的下一行。
这等效于SQL中的LEAD函数。
- col –列或表达式的名称
- offse-要扩展的行数
- default–默认值
ntile(n)
在有序窗口分区中返回ntile组ID(从1到n,包括1和n)。例如,如果n为4,则行的第一季度将获得值1,第二季度将获得2,第三季度将获得3,最后一行将获得4。
这等效于SQL中的NTILE函数。
percent_rank
返回窗口分区内行的相对排名(即百分位)。
rank
返回窗口分区内的行的排名。
rank和density_rank之间的区别在于,当有联系时,densed_rank在排序序列中不留任何空隙。也就是说,如果您使用density_rank对比赛进行排名,并且有3个人并列第二,那么您将说所有3个人都排在第二位,而下一个人则排在第三位。排名会给我连续的数字,使得排在第三位的人(并列之后)将排在第五位。
这等效于SQL中的RANK函数。
row_number
返回窗口分区中从1开始的序列号。
开窗函数
没有时间格式的窗口的话,需要用如下方法。
from pyspark.sql.window import Window
from pyspark.sql.window import Window
df = spark.createDataFrame([(1, 2),(3,5),(5,6),(1,9)], ('C1', 'C2'))
df.show()
df.select('C1','C2',F.rank().over(
Window.partitionBy('C1').orderBy(F.col("C2").desc())).alias("rank")).show()
+---+---+
| C1| C2|
+---+---+
| 1| 2|
| 3| 5|
| 5| 6|
| 1| 9|
+---+---+
+---+---+----+
| C1| C2|rank|
+---+---+----+
| 5| 6| 1|
| 1| 9| 1|
| 1| 2| 2|
| 3| 5| 1|
+---+---+----+
十、其它操作
spark_partition_id
分区ID列。
注意 这是不确定的,因为它取决于数据分区和任务调度
df.repartition(1).select(F.spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
struct(*cols)
创建一个新的结构体列。
substring(str,pos,len )
从位置pos处开始,返回字符串str的长度为len的截取。
df = spark.createDataFrame([('abcd',)], ['s',])
df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s='ab')]
to_date(col,format = None)
等同于'col.cast("date")'。
to_json(col,options = {})
含有一列转换StructType,ArrayType或MapType 成JSON字符串。在不受支持的类型的情况下引发异常。
- col –包含结构,数组或映射的列的名称。
- options –控制转换的选项。接受与JSON数据源相同的选项。另外,该函数支持pretty选项,该选项可启用漂亮的JSON生成。
to_str
str()的包装器,但将布尔值转换为小写字符串。如果给出None,则只返回None,而不是将其转换为字符串“ None”.
to_timestamp(col,format = None )
等同于col.cast("timestamp")
to_utc_timestamp(timestamp,tz)
时间戳时区转换。
translate(srcCol, matching, replace)
匹配srcCol中任何出现matching的字符,用replace依次替换。
spark.createDataFrame([('translate',)], ['a']).select(F.translate('a', "rnlt", "123").alias('r')).collect()
Out[89]: [Row(r='1a2s3ae')]
spark.createDataFrame([('translate',)], ['a']).select(F.translate('a', "rnlt", "1234").alias('r')).collect()
Out[90]: [Row(r='41a2s3a4e')]
spark.createDataFrame([('translate',)], ['a']).select(F.translate('a', "rnlt", "12345").alias('r')).collect()
Out[91]: [Row(r='41a2s3a4e')]
spark.createDataFrame([('translate',)], ['a']).select(F.translate('a', "rn", "12345").alias('r')).collect()
Out[92]: [Row(r='t1a2slate')]
lit(x)
创建一列值全为x的列。
broadcast(col)
对datafram数据广播,对于优化spark运行效率很有效,比如下面例子大表join小表,广播后效率高。
from pyspark.sql.functions import broadcast
small_df = ...
large_df = ...
large_df.join(broadcast(small_df), ["foo"])
grouping(col)
显示是否做了聚合操作,在结果集中返回1表示聚合,返回0表示未聚合
grouping_id(*cols)
返回分组级别
json_tuple(col,*fields)
根据给定的字段名称为json列创建新行。
- col – json格式的字符串列
- fields – 要提取的字段列表
monotonically_increasing_id()
生成单调递增的64位整数的列。
保证生成的ID是单调递增且唯一的,但不是连续的。当前实现将分区ID放在高31位中,将记录号放在每个分区的低33位中。假定数据帧的分区少于10亿,每个分区的记录少于80亿。
- 该函数是不确定的,因为其结果取决于分区ID。
sequence(col1,col2,step=None)
生成整数序列开始到停止,通过增加一步。如果未设置step,则如果start小于或等于stop,则加1,否则为-1。
df1 = spark.createDataFrame([(-2, 2)], ('C1', 'C2'))
df1.select(F.sequence('C1', 'C2').alias('r')).show()
df2 = spark.createDataFrame([(4, -4, -2)], ('C1', 'C2', 'C3'))
df2.select(F.sequence('C1', 'C2', 'C3').alias('r')).show()
+-----------------+
| r|
+-----------------+
|[-2, -1, 0, 1, 2]|
+-----------------+
+-----------------+
| r|
+-----------------+
|[4, 2, 0, -2, -4]|
+-----------------+