r-使用spark mllib scala API按组运行3000+个随机林模型


0

我正在尝试使用spark scala api在一个大型模型输入csv文件上按组(school_id,超过3000)构建随机森林模型。每个组包含大约3000-4000条记录。我拥有的资源是20-30个AWS M3.2倍的大型实例。

在R中,我可以按组构造模型并将它们保存到这样的列表中-

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

列表可以存储在某个地方,当我需要使用它们时,我可以调用它们,如下所示-

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

我想知道如何在Spark中做到这一点,是否需要首先按组分割数据,以及在必要时如何有效地进行分割。

我可以 root据下面的代码按学校ID拆分文件,但似乎它为每个迭代创建了一个单独的作业子集,完成这些作业需要很长时间。有办法一次就完成吗?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
bySchoolArray(i).
write.format("com.databricks.spark.csv").
option("header", "true").
save("model_input_bySchool/model_input_"+ schools(i))
}

资料来源:

编辑:8/24/2015

基本上,我创建了一个新的变量“label”,并将类存储在double中。然后,我使用VectorAssembler函 array合我的所有特性,并按如下方式 transformation输入数据-

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
select("SCHOOL_ID", "label", "features")

部分错误消息(如果需要完整的日志消息,请通知我)-

scala.matchError:StringType(属于类

这是在将所有变量 transformation为数字类型之后解决的。

编辑:2015年8月25日

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

任何建议或指示都是有帮助的,可以随时提出任何问题。谢谢!

1 答案

0

因为你已经为每一所学校建立了独立的 dataframe,所以这里没有太多 job要做。因为您是 DataFrame ,所以我假设您要使用ml.classification.randomforestClassifier。如果是这样,您可以尝试这样的操作:

    提取管道逻辑。 root据您的要求调整RandomForestClassifier参数和变压器

编辑

root据官方文件:

VectorAssembler接受以下输入列类型:所有数值类型、布尔类型和矢量类型。

由于错误表明您的列是一个字符串,您应该首先对其进行 transformation,例如使用StringIndexer。


我来回答