python-如何用线性回归 evaluated Spark管道中模型的性能(准确性)


0

尝试用线性回归运行Spark管道,我能够执行模型,并寻找

    为了找到我需要模型摘要的模型效率和其他度量,我找到了一些Python示例,我在下面评论了这些示例以供参考。
       import org.apache.spark.ml.feature.VectorAssembler
       import spark.implicits._
       import org.apache.spark.sql
       import org.apache.spark.sql.functions._
       import org.apache.spark.sql.types.DecimalType
       import org.apache.spark.sql.{Dataset, Row, SparkSession}
       import org.apache.spark.ml.regression.LinearRegression
       import org.apache.spark.ml.feature.OneHotEncoderEstimator
       import org.apache.spark.ml.{Pipeline, PipelineModel}    

   val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
    val trainingDF = splitDF(0)
    val testingDF = splitDF(1) 


    val encoder = new OneHotEncoderEstimator()
      .setInputCols(Array("_LookUpID"))
      .setOutputCols(Array("_LookUpID_Encoded"))

    val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
    val assembler = new VectorAssembler()
      .setInputCols(requiredFeatures)
      .setOutputCol("features")


    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)
      .setFeaturesCol("features")
      .setLabelCol("VALUE2")

    // Fit the model
    val pipeline = new Pipeline()
      .setStages(Array(encoder, assembler, lr))

    // Fit the pipeline to training documents.
    val lrModel = pipeline.fit(trainingDF)

    val predictions = lrModel.transform(testingDF)
    println("*** Predictions ***")
    predictions.printSchema()  

predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)

    val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
    println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
    println("R Squared: " + rm.r2)
    println("Explained Variance: " + rm.explainedVariance + "

")

分区摄取

def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
 val inputDF: DataFrame = 
 spark.read.format(source = "jdbc")
  .option("url", "jdbc:oracle:thin:@//url")
        .option("user", "user")
        .option("password", "password")
        .option("driver", "oracle.jdbc.OracleDriver")
        .option("dbtable", s"($sql)")
        .option("partitionColumn", "_LookUpID")
        .option("numPartitions", "6")
        .option("lowerBound", lowerNumber)
        .option("upperBound", upperNumber)
        .load()
 inputDF
}
    下面的pipleline内存不足(java.lang.OutOfMemory错误:Java heap space at…)如果我为一个 DataSet 提供一百万行(在100K时 job正常),即使该作业分配了32GB内存。尝试了.cache()inputDF,但没有成功。是因为编码了查找id,我还能做些什么

谢谢

1 答案


0

用回归度量更新问题,以获取度量的RMSE和R平方等

对 DataSet 进行分区并增加了驱动程序的堆内存,该驱动程序目前解决了内存问题。将继续监视


我来回答

写文章

提问题

面试题