今天用spark的时候碰到的问题,直接对一个大的dataframe做agg,导致buffer超了。
可以人为的在dataframe上append一个新的字段,根据字段先做一个agg,最后再agg,就不会超了
import random
def get_rand(i):
return random.randint(1,10000)
randUdf = udf(get_rand,IntegerType())
getP = udf(get_placement, ArrayType(IntegerType()))
tmp_df.withColumn("salt_key",randUdf(col('placement_ids'))).groupby('salt_key').agg(getP(collect_list(struct('placement_ids'))).alias("ids")).show()
这样再agg一次,就没有问题啦