在spark scala中如何将文本文件的第一行作为头,跳过第二行


0

我正试图找出如何使用文本文件的第一行作为标题,并跳过第二行。到目前为止,我已经尝试过:

scala> val file = spark.sparkContext.textFile("/home/webwerks/Desktop/UseCase-03-March/Temp/temp.out")  
file: org.apache.spark.rdd.RDD[String] = /home/webwerks/Desktop/UseCase-03-March/Temp/temp.out MapPartitionsRDD[40] at textFile at <console>:23

scala> val clean = file.flatMap(x=>x.split(" ")).filter(x=> !(x.contains("-")))
clean: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:25

scala> val df=clean.toDF()
df: org.apache.spark.sql.DataFrame = [value: string]

scala> df.show
+--------------------+
| value|
+--------------------+
|time task...|
|03:27:51.199 FCPH...|
|03:27:51.199 PORT...|
|03:27:51.200 PORT...|
|03:27:51.200 PORT...|
|03:27:59.377 PORT...|
|03:27:59.377 PORT...|
|03:27:59.377 FCPH...|
|03:27:59.377 FCPH...|
|03:28:00.468 PORT...|
|03:28:00.468 PORT...|
|03:28:00.469 FCPH...|
|03:28:00.469 FCPH...|
|03:28:01.197 FCPH...|
|03:28:01.197 FCPH...|
|03:28:01.197 PORT...|
|03:28:01.198 PORT...|
|03:28:09.380 PORT...|
|03:28:09.380 PORT...|
|03:28:09.380 FCPH...|

这里我希望第一行作为标题,数据应该按制表符分开

数据如下:

time         task       event   port cmd  args
--------------------------------------------------------------------------------------
03:27:51.199 FCPH       seq      13   28  00300000,00000000,00000591,00020182,00000000
03:27:51.199 PORT       Rx       11    0  c0fffffd,00fffffd,0ed10335,00000001
03:27:51.200 PORT       Tx       13   40  02fffffd,00fffffd,0ed3ffff,14000000
03:27:51.200 PORT       Rx       13    0  c0fffffd,00fffffd,0ed329ae,00000001
03:27:59.377 PORT       Rx       15   40  02fffffd,00fffffd,0336ffff,14000000
03:27:59.377 PORT       Tx       15    0  c0fffffd,00fffffd,03360ed2,00000001
03:27:59.377 FCPH       read     15   40  02fffffd,00fffffd,d0000000,00000000,03360ed2
03:27:59.377 FCPH       seq      15   28  22380000,03360ed2,0000052b,0000001c,00000000
03:28:00.468 PORT       Rx       13   40  02fffffd,00fffffd,29afffff,14000000
03:28:00.468 PORT       Tx       13    0  c0fffffd,00fffffd,29af0ed5,00000001

1 答案


0
        scala> val ds = spark.read.textFile("data.txt")  > spark-v2.0
                         (or) 
        val ds = spark.sparkContext.textFile("data.txt")

        scala> val schemaArr = ds.filter(x=>x.contains("time")).collect.mkString.split("    ").toList

        scala> val df = ds.filter(x=> !x.contains("time"))
                          .map(x=>{
                                val cols = x.split("    ")
                                (cols(0),cols(1),cols(2),cols(3),cols(4),cols(5))
                               }).toDF(schemaArr:_*)

        scala> df.show(false)
        +------------+----+-----+----+---+--------------------------------------------+
        |time        |task|event|port|cmd|args                                        |
        +------------+----+-----+----+---+--------------------------------------------+
        |03:27:51.199|FCPH|seq  |13  |28 |00300000,00000000,00000591,00020182,00000000|
        |03:27:51.199|PORT|Rx   |11  | 0 |c0fffffd,00fffffd,0ed10335,00000001         |
        |03:27:51.200|PORT|Tx   |13  |40 |02fffffd,00fffffd,0ed3ffff,14000000         |
        |03:27:51.200|PORT|Rx   |13  | 0 |c0fffffd,00fffffd,0ed329ae,00000001         |
        |03:27:59.377|PORT|Rx   |15  |40 |02fffffd,00fffffd,0336ffff,14000000         |
        |03:27:59.377|PORT|Tx   |15  | 0 |c0fffffd,00fffffd,03360ed2,00000001         |
        |03:27:59.377|FCPH|read |15  |40 |02fffffd,00fffffd,d0000000,00000000,03360ed2|
        |03:27:59.377|FCPH|seq  |15  |28 |22380000,03360ed2,0000052b,0000001c,00000000|
        |03:28:00.468|PORT|Rx   |13  |40 |02fffffd,00fffffd,29afffff,14000000         |
        |03:28:00.468|PORT|Tx   |13  | 0 |c0fffffd,00fffffd,29af0ed5,00000001         |
        +------------+----+-----+----+---+--------------------------------------------+

请尝试上面这样的方法,如果您想要schema,请使用costume schema应用到它


我来回答

写文章

提问题

面试题