Java- Flink: DataSet .count()是瓶颈-如何计算并行?


0

我正在学习使用Flink的Map Reduce,并且有一个关于如何有效地计算 DataSet 中的元素的问题。到目前为止,我得到的是:

DataSet<MyClass> ds = ...;
long num = ds.count();

在我的火石日志里写着

2016年3月12日19:47:27 DataSink(count())(1/1)切换为运行

所以只有一个CPU被使用(我有四个和其他命令,比如reduce-use-all)。

我认为count()在内部收集来自所有四个CPU的 DataSet 并按顺序计数,而不是让每个CPU计算其部分,然后求和。是真的吗?

如果是,如何利用我所有的CPU?首先将我的 DataSet map到一个包含原始值作为第一项、长值1作为第二项的2元组,然后使用SUM函数对其进行聚合,好吗?

例如, DataSet 将 map到DataSet>,其中Long始终为1。所以当我对所有的项求和时,元组的第二个值的和就是正确的计数值。

对 DataSet 中的项进行计数的最佳实践是什么?

当做

2 答案


0

DataSet#count()是一个非并行操作,因此只能使用单个线程。

您将逐个 key进行计数以获得并行化,并在 key计数上应用最后的和,以获得总计数,以加快计算速度。


0

这是个好办法吗?

DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() { 
    @Override public Tuple1<Long> map(MyClass t) throws Exception { 
        return new Tuple1<Long>(1L); 
    } 
}).groupBy(0).sum(0);

Long c = x.collect().iterator().next().f0;


我来回答

写文章

提问题

面试题