如何在Flink中建立一个1小时的回放流缓冲区?


0

我想动态保存最近1小时事件的缓冲区。这个缓冲区应该给我一个重放函数,这样就可以对最后一小时的数据执行 query。

我试图使用windowapi,但似乎Flink没有给我一个向前移动的固定宽度的时间窗口。

1 答案


0

我有自己的问题的解决办法,但我想保留这个问题,以防你有更好的解决办法。因为我的的确违反了函数式编程的一些良好实践。

我的方法如下。

            val keyedEventStream: KeyedStream[E]

        // create a stream of [hourly window as a set of events]
        val eventWindowStream = keyedEventStream.timeWindow(Time.minutes(60), Time.milliseconds(50)).fold(scala.collection.Set[E]())((set: scala.collection.Set[E], event: E) => set + event)


        // This is the hourly buffer my process logic will use
        var workWindow = scala.collection.Set[E]()
        // update the workspace window with the stream of hourly window.
        eventWindowStream.map((set: scala.collection.Set[W]) => workWindow = set)

如您所见,last map的唯一目的是更新变量workWindow,这实际上是内联函数的一个副作用。。。


我来回答

写文章

提问题

面试题