scala-在groupby/aggregate中激发 coalesce /组合 array


0

下面的spark代码正确地演示了我想要做的事情,并用一个小的演示 DataSet 生成了正确的输出。

当我在大量生产数据上运行相同类型的代码时,我遇到了运行时问题。Spark作业在我的集群上运行了大约12小时,但失败了。

只要看一眼下面的代码, Explode 每一行似乎效率很低,只是把它 coalesce 回去。在给定的 test DataSet 中,第四行的三个值在 array“值”1中,三个值在 array“值”2中,将 Explode 成3*3或9个分解行。

所以,在一个更大的 DataSet 中,一行有五个这样的 array列,每列有十个值,会分解成10^5个分解行?

查看提供的spark函数,没有现成的函数可以满足我的需要。我可以提供一个用户定义的函数。这有什么速度上的缺点吗?

val sparkSession = SparkSession.builder.
  master("local")
  .appName("merge list test")
  .getOrCreate()

val schema = StructType(
StructField("category", IntegerType) ::
StructField("array_value_1", ArrayType(StringType)) ::
StructField("array_value_2", ArrayType(StringType)) ::
Nil)

val rows = List(
Row(1, List("a", "b"), List("u", "v")),
Row(1, List("b", "c"), List("v", "w")),
Row(2, List("c", "d"), List("w")),
Row(2, List("c", "d", "e"), List("x", "y", "z"))
)

val df = sparkSession.createDataFrame(rows.asJava, schema)

val dfExploded = df.
withColumn("scalar_1", explode(col("array_value_1"))).
withColumn("scalar_2", explode(col("array_value_2")))

// This will output 19. 22 + 22 + 21 + 33 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")

val dfOutput = dfExploded.groupBy("category").agg(
collect_set("scalar_1").alias("combined_values_2"),
collect_set("scalar_2").alias("combined_values_2"))

dfOutput.show()

1 答案

0

Explode 的效率可能很低,但从 root本上说,您试图实现的操作非常昂贵。实际上,它只是另一个GroupByKey,您在这里无法做太多的 job来改善它。由于使用spark>2.0,您可以直接收集_list并展平:

import org.apache.spark.sql.functions.{collect_list, udf}

val flatten_distinct = udf(
(xs: Seq[Seq[String]]) => xs.flatten.distinct)

df
.groupBy("category")
.agg(
flatten_distinct(collect_list("array_value_1")),
flatten_distinct(collect_list("array_value_2"))
)

在spark>=2.4中,可以用内置函数的组合替换UDF:

import org.apache.spark.sql.functions.{array_distinct, flatten}

val flatten_distinct = (array_distinct _) compose (flatten _)

也可以使用自定义聚合器,但我怀疑其中任何一个都会产生巨大的影响。

如果集合相对较大,并且您希望有大量重复,则可以尝试将AggregateByKey与可变集合一起使用:

import scala.collection.mutable.{Set => MSet}

val rdd = df
.select($"category", struct($"array_value_1", $"array_value_2"))
.as[(Int, (Seq[String], Seq[String]))]
.rdd

val agg = rdd
.aggregateByKey((MSetString, MSetString))(
{case ((accX, accY), (xs, ys)) => (accX ++= xs, accY ++ ys)},
{case ((accX1, accY1), (accX2, accY2)) => (accX1 ++= accX2, accY1 ++ accY2)}
)
.mapValues { case (xs, ys) => (xs.toArray, ys.toArray) }
.toDF


我来回答