python—为该任务编写自定义累加器参数的正确方法是什么?


0

上下文:在Azure数据库、Python编程语言、Spark环境中 job。

我有一个rdd,并且创建了一个 map操作。

rdd = sc.parallelize(my_collection)
mapper = rdd.map(lambda val: do_something(val))

假设这个 map器中的元素是Foo类型的。我在驱动程序节点上有一个Bar类型的全局对象,并且有一个需要从 job节点(即 map器中的元素)填充的Foo对象的内部集合。

# This is what I want to do
bar_obj = Bar()

def add_to_bar(foo_obj):
global bar_obj
bar_obj.add_foo(foo_obj)

mapper.foreach(add_to_bar)

root据我对RDD编程指南的理解,这将不起作用,因为闭包在Spark中是如何 job的。相反,我应该使用累加器来完成这个任务。

我知道我需要以某种方式对AccumulatorParam进行子类划分,但我不确定这个类是什么样的,以及在这种情况下如何使用它。

这是我的第一张通行证:

class FooAccumulator(AccumulatorParam):
  def zero(self, value):
    return value.bar
  def addInPlace(self, value1, value2):
    # bar is the parent Bar object for the value1 Foo instance
    value1.bar.add_foo(value2)
    return value1

但我不确定如何从这里开始。

我还想补充一点,我试图简单地.collect()从 map程序中收集结果,但结果集会大于驱动程序节点上允许的最大内存(大约4G,当增加到10G时,它会运行,但最终会超时)。

1 答案


0

我不知道到目前为止你有没有试过?我自己发现了这段代码:

    from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

所以也许你可以这样做:

from pyspark import AccumulatorParam

class FooAccumulator(AccumulatorParam):
def zero(self, f):
return []
def addInPlace(self, f1, f2):
f1.extend(f2)
return acc1

accumulator = sc.accumulator("", FooAccumulator())

我想这条线对你也有帮助。


我来回答

写文章

提问题

面试题