有没有在Flink中使用直方图累加器的例子


0

我偶然发现了Flink层次结构中的Histogram类,但没有“这里是如何使用这种”的文档。我想做些类似的事情:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()

但遗憾的是,直方图不能通过Flink序列化。也许有一个“这是你如何使用这个”或者有另一种方法通过flink直方图。

我显然做错了什么。

1 答案


0

Flink的累加器并不打算用作数据流或 DataSet 的数据类型。

相反,它们是通过RuntimeContext注册的,可以从RichFunction.getRuntimeContext(). 这通常在aRichFunction的open()方法中完成:

class MyFunc extends RichFlatMapFunction[Int, Int] {

val hist: Histogram = new Histogram()

override def open(conf: Configuration): Unit = {
getRuntimeContext.addAccumulator("myHist", hist)
}

override def flatMap(value: Int, out: Collector[Int]): Unit = {
hist.add(value)
}
}

累加器的所有并行实例都会定期发送到JobManager(主进程)并进行 coalesce 。可以从流返回的JobExecutionResult访问它们的值ExecutionEnvironment.execute().

我认为你的用例不能被Flink的累加器解决。您应该创建一个自定义直方图数据类型。


我来回答

写文章

提问题

面试题