如何在Flink中对WindowedStream执行自定义操作?


0

我想对Flink中的WindowedStream执行一些操作,比如average。

val windowedStream = valueStream
                          .keyBy(0)
                          .timeWindow(Time.minutes(5))
                          .sum(2) //Change this to average?

假设我要求平均值,我怎么能做到呢?

1 答案


0

Flink没有内置函数来计算WindowStream上的平均值。您必须为此实现一个自定义窗口函数。

最有效的方法是实现一个ReduceFunction,它计算要平均的值的计数和和和,然后实现一个接受ReduceFunction结果并计算平均值的WindowFunction。使用ReduceFunction更有效,因为Flink直接将它应用于传入的值。因此,它会动态地聚合值,而不会在窗口中收集这些值。这显著减少了窗口的内存占用。

由于ReduceFunction的输出具有与其输入相同的类型,因此在应用ReduceFunction之前,需要为count添加一个字段。

下面这样的方法可以做到:

val valueStream: DataStream[(String, Double)] = ???

val r: DataStream[(String, Double)] = valueStream
// append a 1L for counting
.map(x => (x._1, x._2, 1l))
// key and window stream
.keyBy(0).timeWindow(Time.minutes(5))
.apply(
// ReduceFunction (compute sum and count)
(x: (String, Double, Long), y: (String, Double, Long)) =>
(x._1, x._2 + y._2, x._3 + y._3),
// WindowFunction
(key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
// get first (and only) value
val x: (String, Double, Long) = input.toIterator.next
// compute average as sum / count
out.collect(x._1, x._2 / x._3)
}
)


我来回答

写文章

提问题

面试题