scala-如何 explainrdd.treeagregate


0

我在Apache Spark代码源中遇到了这一行

val (gradientSum, lossSum, miniBatchSize) = data
    .sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
    )

我读这个有很多问题:

    首先,我在网上找不到任何能准确 explain树聚合 job原理的东西,这些参数的含义是什么。

这句话一定很高级。我不能开始破译这个。

1 答案

0

TreeAgregate是聚合的一个专门实现,它迭代地将combine函数应用于分区的一个子集。这样做是为了防止将所有部分结果返回给驱动程序,在这种情况下,会像经典聚合那样发生单次通过减少。

出于所有实际目的,TreeAgregate遵循与此答案中所 explain的聚合相同的原则: explainPython中的聚合功能,但需要一个额外的参数来指示部分聚合级别的深度。

让我试着 explain一下这里到底发生了什么:

对于聚合,我们需要一个零、一个 coalesce 函数和一个reduce函数。

然后我们可以像这样分析上面的函数。希望这有助于理解:

val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long)  =  (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }

然后我们可以用一种更易于理解的形式重写对treeagregate的调用:

val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)

此表单将把生成的元组“提取”到命名值GradientSum、LossSum和MinibatchSize中,以供进一步使用。

请注意,TreeAgregate使用一个附加的参数depth,该参数depth声明为默认值depth=2,因此,由于此特定调用中没有提供该参数depth,因此它将使用该默认值。


我来回答