在spark中,将每个组作为一个新的 DataFrame 并在循环中传递另一个函数的最佳方法是什么?


0

我正在使用spark-sql-2.4.1v,我正在尝试在给定数据的每一列上找到分位数,即0分位数、25分位数等。

我的数据:

+----+---------+-------------+----------+-----------+--------+
|  id|     date|      revenue|con_dist_1| con_dist_2| state  |
+----+---------+-------------+----------+-----------+--------+
|  10|1/15/2018|  0.010680705|         6|0.019875458|   TX   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   AZ   |
|  10|1/15/2018|   0.01378215|         4|0.082049528|   TX   |
|  10|1/15/2018|  0.010680705|         6|0.019875458|   TX   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   AZ   |
|  10|1/15/2018|   0.01378215|         4|0.082049528|   CA   |
|  10|1/15/2018|  0.010680705|         6|0.019875458|   CA   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   CA   |
+----+---------+-------------+----------+-----------+--------+

我会让各州来计算

val states = Seq("CA","AZ");
val cols = Seq("con_dist_1" ,"con_dist_2")

对于每个给定的状态,我需要从源表中获取数据,并仅计算给定列的百分比。

我试着如下

for( state <- states){

 for( col &lt;- cols){
    // pecentile calculation
 }

}

这太慢了,当按“state”分组时,不会得到其他列,如revenue、date和id。。如何得到这些?

如何在每个州的“conúdist戋u 1”和“con戋dist戋u 2”列中找到分位数?那么,在集群上扩展的最佳方式是什么?

处理这个用例的最佳方法是什么?

预期结果

+-----+---------------+---------------+---------------+---------------+---------------+---------------+
|state|col1_quantile_1|col1_quantile_2|col1_quantile_3|col2_quantile_1|col2_quantile_2|col2_quantile_3|
+-----+---------------+---------------+---------------+---------------+---------------+---------------+
|   AZ|              4|              4|              4|    0.816039063|    0.816039063|    0.816039063|
|   TX|              4|              6|              6|    0.019875458|    0.019875458|    0.082049528|
+-----+---------------+---------------+---------------+---------------+---------------+---------------+

2 答案


0

您可能需要执行以下代码类似的操作

df.groupBy(col("state"))
    .agg(collect_list(col("con_dist_1")).as("col1_quant"), collect_list(col("con_dist_2")).as("col2_quant"))
    .withColumn("col1_quant1", col("col1_quant")(0))
    .withColumn("col1_quant2", col("col1_quant")(1))
    .withColumn("col2_quant1", col("col2_quant")(0))
    .withColumn("col2_quant2", col("col2_quant")(1))
    .show

OutPut:
+-----+----------+--------------------+-----------+-----------+-----------+-----------+
|state|col1_quant| col2_quant|col1_quant1|col1_quant2|col2_quant1|col2_quant2|
+-----+----------+--------------------+-----------+-----------+-----------+-----------+
| AZ| [4, 4]|[0.816039063, 0.8...| 4| 4|0.816039063|0.816039063|
| CA| [4, 6]|[0.082049528, 0.0...| 4| 6|0.082049528|0.019875458|
| TX| [6, 4, 6]|[0.019875458, 0.0...| 6| 4|0.019875458|0.082049528|
+-----+----------+--------------------+-----------+-----------+-----------+-----------+

可能最后一组withColumn应该在循环中,这取决于每个状态的记录数。

希望这有帮助!


0

更新

我从配置单元上下文中找到了percentile_approx函数,因此不需要使用stat函数。

val states = Seq("CA", "AZ")
val cols = Seq("con_dist_1", "con_dist_2")

val l = cols.map(c => expr(s"percentile_approx($c, Array(0.25, 0.5, 0.75)) as ${c}_quantiles"))
val df2 = df.filter($"state".isin(states: _)).groupBy("state").agg(l.head, l.tail: _)

df2.select(col("state") +: cols.flatMap( c => (1 until 4).map( i => col(c + "quantiles")(i - 1).alias(c + "_quantile" + i))): _*).show(false)

在这里,我尝试了给定状态和列的自动方法。结果将是;

+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|state|con_dist_1_quantile_1|con_dist_1_quantile_2|con_dist_1_quantile_3|con_dist_2_quantile_1|con_dist_2_quantile_2|con_dist_2_quantile_3|
+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|AZ   |4                    |4                    |4                    |0.816039063          |0.816039063          |0.816039063          |
|CA   |4                    |4                    |6                    |0.019875458          |0.082049528          |0.816039063          |
+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+

请注意,结果与预期的有点不同,因为我设置了states=Seq(“CA”,“AZ”),这是由您给出的。

原件

对状态使用Window并计算每列的百分比。

import org.apache.spark.sql.expressions.Window

val w1 = Window.partitionBy("state").orderBy("con_dist_1")
val w2 = Window.partitionBy("state").orderBy("con_dist_2")
df.withColumn("p1", percent_rank.over(w1))
.withColumn("p2", percent_rank.over(w2))
.show(false)

您可以先筛选 DataFrame ,仅针对特定状态。不管怎样,结果是:

+---+---------+-----------+----------+-----------+-----+---+---+
|id |date     |revenue    |con_dist_1|con_dist_2 |state|p1 |p2 |
+---+---------+-----------+----------+-----------+-----+---+---+
|10 |1/15/2018|0.006628853|4         |0.816039063|AZ   |0.0|0.0|
|10 |1/15/2018|0.006628853|4         |0.816039063|AZ   |0.0|0.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|CA   |1.0|0.0|
|10 |1/15/2018|0.01378215 |4         |0.082049528|CA   |0.0|0.5|
|10 |1/15/2018|0.006628853|4         |0.816039063|CA   |0.0|1.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|TX   |0.5|0.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|TX   |0.5|0.0|
|10 |1/15/2018|0.01378215 |4         |0.082049528|TX   |0.0|1.0|
+---+---------+-----------+----------+-----------+-----+---+---+

我来回答

写文章

提问题

面试题