apacheflink-从多个已排序分区中获取前n个元素


0

我要读多个文件,计算重复行数,按重复数排序行,取前10个重复行。

lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)
sort = aggregate.sortPartition(1, Order.DESCENDING)
sorted.first(10).writeAsText("domains")

问题是first-n是任意的,从所有分区中随机返回10个第一个元素。

有没有一种方法可以在不降低并行度为1的情况下从所有分区中选择排序后的前n个元素?

1 答案


0

我将使用一个parallel MapPartitionFunction来解决这个问题,它返回每个分区的前10个元素,将结果发送到单个分区,对其进行排序并再次获取前10个元素。这看起来像这样:

lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)

// sort partitions in parallel
sortPart = aggregate.sortPartition(1, Order.DESCENDING)
// take first 10 of each partition
firstPart = sortPart.mapPartition(new First(10))

// sort all in one partition
sortFull = firstPart.sortPartition(1, Order.DESCENDING).parallelism(1)
// take first 10
first10 = sortFull.mapPartition(new First(10))
first10.writeAsText("domains")

MapPartitionFunction首先将非常简单。它只统计要转发多少条记录,并在计数器降至0时从mapPartition()函数返回。


我来回答

写文章

提问题

面试题