scala-Spark DataFrame列,包含需要用另一列中给定的值更新的其他列的逗号分隔列表


0

我有一个试图在Spark DataFrame 中解决的用例。

+----+----+----+---------+----+
|col1|col2|col3|     col4|col5|
+----+----+----+---------+----+
|   A|   B|   C|col2,col3| X,Y|
|   P|   Q|   R|     col1|   Z|
|   I|   J|   K|col1,col3| S,T|
+----+----+----+---------+----+

transformation后-生成的 DataFrame 应如下所示。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+

3 答案


0

基本上,我创建了两个col4和col5 array,然后使用map_from_ array创建一个 map,然后使用map生成col1、col2、col3的列,然后使用when、otherwisnotnull子句更改列。

(spark2.4+)

数据

df.show()

+----+----+----+---------+----+
|col1|col2|col3| col4|col5|
+----+----+----+---------+----+
| A| B| C|col2,col3| X,Y|
| P| Q| R| col1| Z|
| I| J| K|col1,col3| S,T|
+----+----+----+---------+----+

% Scala

import org.apache.spark.sql.functions.{col, map_from_arrays, split, when}

df.withColumn("col6", map_from_arrays(split($"col4",","),split($"col5",","))).drop("col4","col5")
.select($"col1",$"col2",$"col3",col("col6.col1").alias("col1_"),col("col6.col2").alias("col2_"),col("col6.col3").alias("col3_"))
.withColumn("col1", when(col("col1_").isNotNull, col("col1_")).otherwise($"col1"))
.withColumn("col2", when(col("col2_").isNotNull,col("col2_")).otherwise($"col2"))
.withColumn("col3",when(col("col3_").isNotNull,col("col3_")).otherwise($"col3"))
.drop("col1_","col2_","col3_")
.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| X| Y|
| Z| Q| R|
| S| J| T|
+----+----+----+

% Python

from pyspark.sql import functions as F

df.withColumn("col6", F.map_from_arrays(F.split("col4",','),F.split("col5",','))).drop("col4","col5")
.select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))
.drop("col1_","col2_","col3_")
.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
| A| X| Y|
| Z| Q| R|
| S| J| T|
+----+----+----+

更新:

(您可以创建一个scala udf并应用类似的逻辑,希望它能有所帮助)

% Python

from pyspark.sql import functions as F
from pyspark.sql.functions import udf


@udf("map<string,string>")
def as_dict(x):
return dict(zip(*x)) if x else None

df.withColumn("col6", F.array(F.split(("col4"),','),F.split(("col5"),','))).drop("col4","col5")
.withColumn("col6", as_dict("col6")).select("col1","col2","col3",F.col("col6.col1").alias("col1_"),F.col("col6.col2").alias("col2_"),F.col("col6.col3").alias("col3_"))
.withColumn("col1", F.when(F.col("col1_").isNotNull(), F.col("col1_")).otherwise(F.col("col1")))
.withColumn("col2", F.when(F.col("col2_").isNotNull(),F.col("col2_")).otherwise(F.col("col2")))
.withColumn("col3",F.when(F.col("col3_").isNotNull(),F.col("col3_")).otherwise(F.col("col3")))
.drop("col1_","col2_","col3_")
.show()


0

spark2.4+

如果列不只有3个,那么它应该可以扩展到更多的列。我已使此代码易于扩展。

val cols = Seq("col1", "col2", "col3")

val df1 = df.withColumn("id", monotonically_increasing_id)
val df2 = cols.foldLeft(
df1.withColumn("col6", explode(arrays_zip(split($"col4", ","),split($"col5", ","))))
.groupBy("id").pivot($"col6.0").agg(first($"col6.1"))
) {(df, c) => df.withColumnRenamed(c, c + "2")}

cols.foldLeft(df1.join(df2, "id")) {(df, c) => df.withColumn(c, coalesce(col(c + "2"), col(c)))}
.select(cols.head, cols.tail: _*)
.show

结果是:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   X|   Y|
|   Z|   Q|   R|
|   S|   J|   T|
+----+----+----+

0

这是一个可以用RDDs的map函数轻松处理的问题:

import org.apache.spark.sql.types.{StructType, StructField, StringType}

val targetColumns = df.columns.take(3) // we assume that the final df should contain 3 first elements. If not feel free to modify this accordingly to your requirements

val updatedRDD = df.rdd.map{ r =>
val keys = r.getAsString.split(",")
val values = r.getAsString.split(",")
val mapping = keys.zip(values).toMap[String, String] // i.e: Map(col2 -> X, col3 -> Y)

val updatedValues = targetColumns.map{c =>
if(keys.contains(c))
mapping(c)
else
r.getAsString
}

Row(updatedValues:_*)
}

val schema = StructType(targetColumns.map{c => StructField(c, StringType, true)})
spark.createDataFrame(updatedRDD, schema).show(false)

// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |A |X |Y |
// |Z |Q |R |
// |S |J |T |
// +----+----+----+

我们使用col4->keys,col5->values创建一个 map,它用于创建将返回的最后一行。


我来回答

写文章

提问题

面试题