scala-保存到分区 parquet file时实现并发性


0

使用PartitionBy将 dataframe 写入 Parquet时:

df.write.partitionBy("col1","col2","col3").parquet(path)

我希望每个正在编写的分区都是由一个单独的任务独立完成的,并且与分配给当前Spark作业的 worker数量相当。

但是,在写入 Parquet时,实际上一次只运行一个 worker/任务。一个 worker正在循环通过每个分区,并连续地写出.parquet文件。为什么会出现这种情况——在spark.write.parquet操作中是否有强制并发的方法?

以下不是我想看到的(应该是700%+…)

在另一篇文章中,我也尝试在前面添加 repartition。

spark parquet分区:大量文件

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

不幸的是,这没有效果:仍然只有一个 worker。

注意:我使用 local[8]以 local模式运行,并且看到其他Spark操作使用多达8个并发 workers运行,占用多达750%的CPU。

1 答案

0

简而言之,从一个任务写入多个输出文件并不是并行的,但是假设您有多个任务(多个输入拆分),每个任务都将在一个 job者上获得自己的 core。

写出分区数据的目的并不是要使你的写操作并行化。Spark已经通过同时写出多个任务来做到这一点了。其目标只是优化将来的读取操作,您只需要保存数据的一个分区。

在spark中写入分区的逻辑设计为在将前一阶段的所有记录写入目标时只读取一次。我认为设计选择的一部分也是为了防止

编辑:Spark 2.x方法

在spark 2.x中,它按照分区 key对每个任务中的记录进行排序,然后迭代这些记录,每次写入一个文件句柄。我假设他们这样做是为了确保如果分区 key中有很多不同的值,他们永远不会打开大量的文件句柄。

以下是排序,供参考:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileformatwriter.scala l121

向下滚动一点,您会看到它调用write(iter.next())在每一行中循环。

下面是实际的写入操作(一次写入一个文件/分区 key):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileformatwriter.scala l121

在那里,您可以看到它一次只保存一个打开的文件句柄。

编辑:Spark 1.x方法

spark 1.x对给定任务的作用是循环访问所有记录,每当遇到以前从未见过的新输出分区时,打开一个新的文件句柄。然后它立即将记录写入该文件句柄并转到下一个句柄。这意味着在处理单个任务的任何给定时间,最多可以为该任务打开n个文件句柄,其中n是最大输出分区数。为了更清楚地说明这一点,下面是一些python psuedo代码,以展示其一般思想:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

handles[partition_path].write(row)

对于上述记录的书写策略有一个警告。在spark 1.x中,设置spark.sql.sources.maxconcurrentwrites会对每个任务可以打开的掩码文件句柄设置一个上限。在达到这一点之后,spark将改为按分区 key对数据进行排序,这样它就可以迭代记录,一次写出一个文件。


我来回答