python—如何将函数逐行应用于PySpark DataFrame 的一组列?


0

给定一个 DataFrame ,如:

   A0  A1  A2  A3
0   9   1   2   8
1   9   7   6   9
2   1   7   4   6
3   0   8   4   8
4   0   1   6   0
5   7   1   4   3
6   6   3   5   9
7   3   3   2   8
8   6   3   0   8
9   3   2   7   1

我需要对一组列逐行应用一个函数,以便用此函数的结果创建一个新列。

pandas的一个例子是:

df = pd.DataFrame(data=None, columns=['A0', 'A1', 'A2', 'A3'])
df['A0'] = np.random.randint(0, 10, 10)
df['A1'] = np.random.randint(0, 10, 10)
df['A2'] = np.random.randint(0, 10, 10)
df['A3'] = np.random.randint(0, 10, 10)

df['mean'] = df.mean(axis=1)
df['std'] = df.iloc[:, :-1].std(axis=1)
df['any'] = df.iloc[:, :-2].apply(lambda x: np.sum(x), axis=1)

结果是:

   A0  A1  A2  A3  mean       std  any
0   9   1   2   8  5.00  4.082483   20
1   9   7   6   9  7.75  1.500000   31
2   1   7   4   6  4.50  2.645751   18
3   0   8   4   8  5.00  3.829708   20
4   0   1   6   0  1.75  2.872281    7
5   7   1   4   3  3.75  2.500000   15
6   6   3   5   9  5.75  2.500000   23
7   3   3   2   8  4.00  2.708013   16
8   6   3   0   8  4.25  3.500000   17
9   3   2   7   1  3.25  2.629956   13

我怎么能在PySpark中做类似的事情?

2 答案


0

对于Spark 2.4+,可以使用聚合函数。首先,使用所有dataframe列创建 array列值。然后,计算std,means和任何这样的列:

    any:聚合以对 array元素求和

以下是相关代码:

from pyspark.sql.functions import expr, sqrt, size, col, array

data = [
(9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
(0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
(6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
(3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])

cols = df.columns

df.withColumn("values", array(*cols))
.withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)"))
.withColumn("mean", col("any") / size(col("values")))
.withColumn("std", sqrt(expr("""aggregate(values, 0D,
(acc, x) -> acc + power(x - mean, 2),
acc -> acc / (size(values) -1))"""
)
))
.drop("values")
.show(truncate=False)

#+---+---+---+---+----+----+------------------+

#|A0 |A1 |A2 |A3 |any |mean|std |

#+---+---+---+---+----+----+------------------+

#|9 |1 |2 |8 |20.0|5.0 |4.08248290463863 |

#|9 |7 |6 |9 |31.0|7.75|1.5 |

#|1 |7 |4 |6 |18.0|4.5 |2.6457513110645907|

#|0 |8 |4 |8 |20.0|5.0 |3.8297084310253524|

#|0 |1 |6 |0 |7.0 |1.75|2.8722813232690143|

#|7 |1 |4 |3 |15.0|3.75|2.5 |

#|6 |3 |5 |9 |23.0|5.75|2.5 |

#|3 |3 |2 |8 |16.0|4.0 |2.70801280154532 |

#|6 |3 |0 |8 |17.0|4.25|3.5 |

#|3 |2 |7 |1 |13.0|3.25|2.6299556396765835|

#+---+---+---+---+----+----+------------------+

spark<2.4:

你可以用functools.reduce函数以及运算符.add对列求和。逻辑与上述相同:

from functools import reduce
from operator import add

df.withColumn("any", reduce(add, [col(c) for c in cols]))
.withColumn("mean", col("any") / len(cols))
.withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))
.show(truncate=False)


0

上面的答案很好,但是我看到OP使用的是Python/PySpark,如果您不理解Spark SQL表达式,那么上面的逻辑并不是100%清楚。

我建议使用 pandasUDAF,与UDF不同,UDF是矢量化的,非常有效。这已经添加到Spark API中,以降低从 pandas迁移到Spark所需的学习曲线。这也意味着,如果像我这样的大多数同事更熟悉Pandas/Python,那么您的代码就更易于维护。

这些是可用的大 pandasUDAF的类型和它们的大 pandas等效物

E、 g SparkUdafType→等位(…) job→返回

标量→测向变换(…), map序列→序列

分组 map → DataFrame 应用(…),组和 map DataFrame → DataFrame

分组聚集→df.骨料(…),减少级数→标量


我来回答

写文章

提问题

面试题