Hadoop-使用pyspark/spark对大型分布式 DataSet 进行采样


0

我在 HDFSS中有一个文件,它分布在集群中的各个节点上。

我想从这个文件中随机抽取10行。

在pyspark shell中,我使用以下方法将文件读取到RDD中:

>>> textFile = sc.textFile("/user/data/myfiles/*")

然后我想简单地取一个样本…Spark的一个很酷的地方是有takesample这样的命令,不幸的是,我认为我做了一些错误的事情,因为以下操作需要很长时间:

>>> textFile.takeSample(False, 10, 12345)

因此,我尝试在每个节点上创建一个分区,然后指示每个节点使用以下命令对该分区进行采样:

>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()

但这会产生一个错误值错误:要解包的值太多:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
    for (k, v) in iterator:
ValueError: too many values to unpack

如何使用spark或pyspark从大型分布式 DataSet 中抽取10行样本?

2 答案

0

尝试使用textfile.sample(false,fraction,seed)代替。takesample通常会非常慢,因为它在RDD上调用count()。它需要这样做,因为否则它不会均匀地从每个分区中获取,基本上它使用计数和您请求的样本大小来计算分数并在内部调用样本。样本很快,因为它只使用一个随机的布尔生成器,返回时间的真分数百分比,因此不需要调用Count。

此外,我不认为这发生在您身上,但是如果返回的样本大小不够大,它会再次调用sample,这显然会减慢速度。既然您应该了解数据的大小,那么我建议您调用sample,然后将样本缩减到自己的大小,因为您对数据的了解比spark要多。


0

使用样本而不是takesample似乎使事情变得相当快:

textFile.sample(False, .0001, 12345)

问题在于,除非您对 DataSet 中的行数有大致的了解,否则很难知道要选择的正确分数。


我来回答