在spark scala中求平均值得到空白结果


0

我有一个input.txt文件。数据如下。

1   1383260400000   0   0.08136262351125882             
1   1383260400000   39  0.14186425470242922 0.1567870050390246  0.16093793691701822 0.052274848528573205    11.028366381681026
1   1383261000000   0   0.13658782275823106         0.02730046487718618 
1   1383261000000   33                  0.026137424264286602
2241    1383324600000   0   0.16869936142032646             
2241    1383324600000   39  0.820500491400199   0.6518011299798726  1.658248219576473   3.4506242774863045  36.71096470849049
2241    1383324600000   49  0.16295028249496815

假设第一列是id,其他列分别是col1、col2、col3、col4、col5、col6和col7。我想找出每个id的col7的平均值。基本上我想得到我的结果,

这是我到目前为止试过的代码。

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

然后我创建了一个 dataframe 。

val data = text.map(line => line.split("    ")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0)))) 

最后保存到一个txt文件中。

val res1 = df.groupBy("ID").agg(avg("col7"))

res1.rdd.saveAsTextFile("/stuaverage/spoutput12")

当我运行这个程序时,我会得到几个结果为空的文件。

[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]

第一列是正确的。但是对于第二列,我应该得到一个值。

请帮忙。

3 答案

0

我建议您使用sqlcontext api并使用您定义的模式

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", "    ")
  .schema(schema)
  .load("path to your text file") 

schema是

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

之后,您只需将avg函数应用于分组 DataFrame

import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))

最后,您可以从dataframe直接保存到csv。你不需要 transformation成RDD

  res1.coalesce(1).write.csv("/stuaverage/spoutput12")

0

问题是, transformationcol7的方式不对,您尝试将其 transformation为doubletype,而不是将其解析为scala double(使用.todouble)。你的演员总是抛出一个异常,因此col7总是0.0。这是有效的:

val rdd = sqlContext.textFile("input.txt")
  .map(line => line.split("    "))
    .map((arr: Array[String]) => Row(
    arr(0).toInt,
    Try(arr(1).toDouble) getOrElse (0.0),
    Try(arr(2).toInt) getOrElse (0),
    Try(arr(3).toDouble) getOrElse (0.0),
    Try(arr(4).toDouble) getOrElse (0.0),
    Try(arr(5).toDouble) getOrElse (0.0),
    Try(arr(6).toDouble) getOrElse (0.0),
    Try(arr(7).toDouble) getOrElse (0.0)
    )
  )

0

试试这个更简洁的版本(假设你在Spark Shell job)。它应该有用。

val df = spark
  .read
  .option("header","false")
  .option("sep","    ")
  .option("inferSchema","true")
  .csv("...input...")
  .toDF("ID","col1","col2","col3","col4","col5","col6","col7")

val result = df.groupBy("ID").mean("col7")

result
.write
.option("header","true")
.option("sep",";")
.csv("...output...")


我来回答

写文章

提问题