k表示-从 dataframe 中触发mllib k means,然后再次返回


0

我的目标是使用spark(1.3.1)mllib将kmeans聚类算法应用于非常大的 DataSet 。我使用spark中的hiveContext调用了 HDFSSS中的数据,并最终希望将其放回原来的位置-采用这种格式

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |

我已经运行了以下代码,其中“data”是双精度数的 DataFrame ,以及第一列的ID。

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)

这成功地运行了,我现在只能将集群 map回它们各自的ID,如前所述的 DataFrame 中。我可以将其 transformation为 dataframe :

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

但这是我所能做到的。这篇文章是在正确的轨道上,我想这篇文章也在问我类似的问题。

我怀疑需要贴标点库。任何评论,答案都会受到赞赏,干杯。

编辑:刚在Spark用户列表中找到这个,看起来很有希望

4 答案

0

我知道你最后想得到 DataFrame 。我看到了两个可能的解决方案。我想说,在它们之间做出选择是出于品味。

从RDD创建列

很容易以RDD的形式获得成对的ID和集群:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

然后从中创建 DataFrame

val idCluster = idClusterRDD.toDF("id", "cluster")

它之所以有效,是因为 map不会改变RDD中数据的顺序,这就是为什么您可以使用预测结果来压缩ID。

使用UDF(用户定义函数)

第二种方法是使用clusters.predict方法作为udf:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)

现在我们可以使用它向数据添加预测:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

请记住,Spark API不允许取消UDF注册。这意味着关闭数据将保存在内存中。

错误/不理想的解决方案

    使用clusters.predict而不进行广播

它在分布式设置中不起作用。编辑:实际上它会 job的,我被使用广播的RDD的Predict的实现搞糊涂了。

    sc.makerdd(clusters.predict(parsedData).toArray().todf())

Toarray收集驱动程序中的所有数据。这意味着在分布式模式下,您将把集群ID复制到一个节点中。


0

我在用PySark做类似的事情。我猜你可以直接把它 transformation成scala,因为没有特定于python的内容。MyPointsWithID是我的RDD,每个点都有一个ID,该点表示为一个值 array。

# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points, 
                        100, 
                        maxIterations=100, 
                        runs=50,
                        initializationMode='random')

For each point in the original RDD, replace the point with the

ID of the cluster the point belongs to.

clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))


0

root据您的代码,我假设:

    数据是具有三列的 DataFrame (标签:double、x1:double和x2:double)

以下是一些玩具数据遵循假定模式的简单示例应用程序:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.{col, udf}

case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
DataRow(3, 1, 2),
DataRow(5, 3, 4),
DataRow(7, 5, 6),
DataRow(6, 0, 0)
)))

val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
val result = data.select(col("label"), t(col("x1"), col("x2")))

重要的部分是最后两行。

    创建一个UDF(用户定义函数),它可以直接应用于 DataFrame 列(在本例中是两列x1和x2)。

0

请告诉我此代码是否适用于您:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._

val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
val df = predictions.toDF("id", "cluster")
df.show


我来回答