基于输入列的scala-Conditional Spark map()函数


0

我在这里要实现的是发送到Spark SQL map函数有条件地生成的列,这取决于它们是否有null、0或我可能需要的任何其他值。

以这个初始DF为例。

val initialDF = Seq(
  ("a", "b", 1), 
  ("a", "b", null), 
  ("a", null, 0)
).toDF("field1", "field2", "field3")

从最初的 DataFrame 中,我想生成另一个列,它将是一个 map,如下所示。

initialDF.withColumn("thisMap", MY_FUNCTION)

我目前的方法基本上是在方法a flatMap中取Seq[String]Spark SQL方法接收的 key值对,如下所示。

def toMap(columns: String*): Column = {
  map(
    columns.flatMap(column => List(lit(column), col(column))): _*
  )
}

但是之后,过滤就变成了Scala的事情,而且相当混乱。

在处理之后,我想得到的是,对于每一行,下一个 DataFrame 。

val initialDF = Seq(
  ("a", "b", 1, Map("field1" -> "a", "field2" -> "b", "field3" -> 1)),
  ("a", "b", null, Map("field1" -> "a", "field2" -> "b")),
  ("a", null, 0, Map("field1" -> "a"))
)
  .toDF("field1", "field2", "field3", "thisMap")

我想知道这是否可以通过使用Column API来实现,而使用.isNull或.equalTo更直观?

2 答案


0

这是对拉曼努斯答案的一个小改进,上面只循环测向柱一次:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class Record(field1: String, field2: String, field3: java.lang.Integer)

val df = Seq(
Record("a", "b", 1),
Record("a", "b", null),
Record("a", null, 0)
).toDS

df.show

// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// | a| b| 1|
// | a| b| null|
// | a| null| 0|
// +------+------+------+

df.withColumn("thisMap", map_concat(
df.columns.map { colName =>
when(col(colName).isNull or col(colName) === 0, map())
.otherwise(map(lit(colName), col(colName)))
}: _*
)).show(false)

// +------+------+------+---------------------------------------+
// |field1|field2|field3|thisMap |
// +------+------+------+---------------------------------------+
// |a |b |1 |[field1 -> a, field2 -> b, field3 -> 1]|
// |a |b |null |[field1 -> a, field2 -> b] |
// |a |null |0 |[field1 -> a] |
// +------+------+------+---------------------------------------+


0

更新

我找到了达到预期效果的方法,但有点脏。

val df2 = df.columns.foldLeft(df) { (df, n) => df.withColumn(n + "_map", map(lit(n), col(n))) }
val col_cond = df.columns.map(n => when(not(col(n + "_map").getItem(n).isNull || col(n + "_map").getItem(n) === lit("0")), col(n + "_map")).otherwise(map()))
df2.withColumn("map", map_concat(col_cond: _*))
  .show(false)

原件

下面是我对spark 2.4+中可能使用的函数map_from_ array的尝试。

df.withColumn("array", array(df.columns.map(col): _*))
  .withColumn("map", map_from_arrays(lit(df.columns), $"array")).show(false)

那么,结果是:

+------+------+------+---------+---------------------------------------+
|field1|field2|field3|array    |map                                    |
+------+------+------+---------+---------------------------------------+
|a     |b     |1     |[a, b, 1]|[field1 -> a, field2 -> b, field3 -> 1]|
|a     |b     |null  |[a, b,]  |[field1 -> a, field2 -> b, field3 ->]  |
|a     |null  |0     |[a,, 0]  |[field1 -> a, field2 ->, field3 -> 0]  |
+------+------+------+---------+---------------------------------------+

我来回答

写文章

提问题

面试题