读取嵌套的完整JSON文件时spark scala ide出错


0

我有一个复杂的嵌套json数据文件,如下所示,我试图使用这些数据并将其 transformation为

按下等

case class DeviceData (id: Int, device: String)

其中id=0和

device = "{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"

但是,在第一步中,当我消耗数据并将其 transformation为一个简单的 DataFrame 时,我自己就陷入了困境,并出现了“损坏”记录错误。请告诉我犯了什么错误。我使用的是Spark版本2.4.5

导出1.json

0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"

我的 spark代码如下

package sparkWCExample.spWCExample

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
import java.util.Formatter.DateTime
import org.apache.spark.sql.types._ // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions
import org.apache.spark.sql.functions.to_timestamp

case class DeviceData (id: Int, device: String)

object DatasetExample {

def main(args: Array[String]) {
println("Start now")
val conf = new SparkConf().setAppName("Spark Scala WordCount Example").setMaster("local[1]")
val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
val sc: SparkContext = spark.sparkContext
import spark.implicits._

val readJSONDF = spark.read.json(sc.wholeTextFiles("C:\Sankha\Study\data\complex-nested-json\export1.json").values).toDF()
println(readJSONDF.show())
}
}

我有个例外

+--------------------+
|     _corrupt_record|
+--------------------+
|0,"{""device_id""...|
+--------------------+

1 答案


0

sc.wholeTextFiles文件创建一个pairdd,密钥是文件名,值是整个文件的内容。更多细节可以在这里找到。

你可能想用 spark.read.text然后把线分开:

val df = spark.read.text("export1.json")
  .map(row => {
    val s = row.getAs[String](0)
    val index = s.indexOf(',')
    DeviceData(s.substring(0, index).toInt, s.substring(index+1))
  })
df.show

打印

+---+--------------------+
| id|              device|
+---+--------------------+
|  0|"{""device_id"": ...|
|  1|"{""device_id"": ...|
|  2|"{""device_id"": ...|
+---+--------------------+

我来回答

写文章

提问题

面试题