在Spark Dataset mapGroups操作之后,值类型是二进制的,甚至在函数中返回一个字符串


0

环境:

Spark version: 2.3.0
Run Mode: Local
Java version: Java 8

spark应用程序尝试执行以下操作

1) 将输入数据 transformation为 DataSet [GenericRecord]

2) 按GenericRecord的 key属性分组

3) 使用mapGroups-after-group迭代值列表并获得字符串格式的结果

4) 在文本文件中将结果输出为字符串。

写入文本文件时出错。Spark推断在步骤3中生成的 DataSet 有一个二进制列,而不是字符串列。但实际上它在mapGroups函数中返回一个字符串。

有没有办法进行列数据类型 transformation,或者让Spark知道它实际上是一个字符串列而不是二进制列?


    val dslSourcePath = args(0)
    val filePath = args(1)
    val targetPath = args(2)
    val df = spark.read.textFile(filePath)

implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)

val mapResult = df.flatMap(abc => {
  JavaConversions.asScalaBuffer(some how return a list of Avro GenericRecord using a java library).seq;
})

val groupResult = mapResult.groupByKey(result => String.valueOf(result.get("key")))
  .mapGroups((key, valueList) => {
    val result = StringBuilder.newBuilder.append(key).append(",").append(valueList.count(_=>true))
    result.toString()
  })

groupResult.printSchema()

groupResult.write.text(targetPath + "-result-" + System.currentTimeMillis())

结果显示是个垃圾箱

root
 |-- value: binary (nullable = true)

Spark给出了一个无法将二进制文件作为文本写入的错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a string column, but you have binary.;
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat.verifySchema(TextFileFormat.scala:55)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat.prepareWrite(TextFileFormat.scala:78)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:595)

1 答案


0

正如@user10938362所说,原因是以下代码将所有数据编码为字节

implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)

用以下代码替换它只会为GenericRecord启用此编码

implicit def kryoEncoder: Encoder[GenericRecord] = Encoders.kryo

我来回答

写文章

提问题

面试题