scala-是否可以应用当。否则在groupBy之后agg中的函数?


0

最近一直在尝试对正在计算的聚合值应用默认函数,这样我就不必在之后重新处理它们。据我所见,我犯了以下错误。

原因:java.lang.UnsupportedOperationException:类型的 schemaorg.apache.spark网站.sql.列不支持

从下面的函数。

    val defaultUDF: UserDefinedFunction = udf[Column, Column, Any](defaultFunction)
    def defaultFunction(col: Column, value: Any): Column = {
      when(col.equalTo(value), null).otherwise(col)
    }

并按以下方式应用。

    val initialDF = Seq(
      ("a", "b", 1),
      ("a", "b", null),
      ("a", null, 0)
    ).toDF("field1", "field2", "field3")

initialDF
  .groupBy("field1", "field2")
  .agg(
    defaultUDF(functions.count("field3"), lit(0)).as("counter") // exception thrown here
  )

我是想在这里施魔法还是我可能错过了什么?

1 答案


0

问题在于用户定义函数的实现:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

def defaultFunction(col: Column, value: Any): Column = {
when(col.equalTo(value), null).otherwise(col)
}

val defaultUDF: UserDefinedFunction = udfColumn, Column, Any
// java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
// at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
// at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
// at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
// at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
// at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
// at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
// at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
// at org.apache.spark.sql.functions$.udf(functions.scala:3914)
// ... 65 elided

您得到的错误基本上是因为Spark无法将UserDefinedFunction defaultFunction的返回类型(即列) map到Spark数据类型。

在任何情况下,如果函数接受列并返回列,则不需要UserDefinedFunction。对于您的用例,以下代码将起作用:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class Record(field1: String, field2: String, field3: java.lang.Integer)

val df = Seq(
Record("a", "b", 1),
Record("a", "b", null),
Record("a", null, 0)
).toDS

df.show

// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// | a| b| 1|
// | a| b| null|
// | a| null| 0|
// +------+------+------+

def defaultFunction(col: Column, value: Any): Column = {
when(col.equalTo(value), null).otherwise(col)
}

df
.groupBy("field1", "field2")
.agg(defaultFunction(count("field3"), lit(0)).as("counter"))
.show

// +------+------+-------+
// |field1|field2|counter|
// +------+------+-------+
// | a| b| 1|
// | a| null| 1|
// +------+------+-------+


我来回答

写文章

提问题

面试题