scala-Spark 2.3 Dataframe partition希望在n个分区的key上对数据进行分区


0

我需要spark(scala) DataFrame 分区方面的帮助。我需要按一个 key列划分成n个分区,所有与同一个 key相关的行应该在同一个分区中(即 key不应该跨分区分布)

注意:我的钥匙可能有数百万个

前任:

等等

如您所见,许多值共享同一个密钥。

提前谢谢

3 答案


0

root据《 spark》一书,权威指南 spark有两个内置的分区器,一个是离散值的hashpartitioner,一个是RangePartitioner。两者都适用于离散值和连续值。

HashPartitioner示例:

import org.apache.spark.HashPartitioner

val rdd = df.rdd // convert DataFrame to low-level RDD
val keyedRDD = rdd.keyBy(...) // define your custom key
keyedRDD.partitionBy(new HashPartitioner(n))

分隔器示例:

import org.apache.spark.Partitioner

class DomainParitioner extends Partitioner {
def numPartitions = n
def getPartition(key: Any): Int = {
// your custome partition logic
}
}

keyedRDD.partitionBy(new DomainPartitioner).map(.1).glom().map(_.toSet.toSeq.length)

在书中还提到,你应该注意 key的倾斜,这意味着一些 key可能比其他 key有更多的值。您希望尽可能多地中断这些 key,以提高并行性并防止在执行过程中出现OutOfMemoryErrors。


0

可以在将 DataFrame 写入基于文件的输出时对其进行分区。类似于:

df.write.partitionBy("colName").format("parquet").save(path-to-file)

这将创建目录结构,如

path
└── to
    └── file
         ├── colName=value1
                       └── data.parquet
         ├── colName=value2
                       └── data.parquet

当您加载数据和过滤器时,谓词将被下推到源文件中,您可以从分区中获得性能优势

这不是你要找的吗?


0

尝试

def repartition(partitionExprs: org.apache.spark.sql.Column*): org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] 

val df = Seq(("aa","vv"),("aa","v1v1"),("a1","v2")).toDF("Key","Value")
 val partionedDf = df.repartition(col("Key"))


我来回答

写文章

提问题

面试题