我正在使用Flink的时间窗口功能来执行一些计算。我正在创建一个5分钟的窗口。不过,我想第一次创建一个小时的窗口。我需要的下一个窗口是5分钟。
在第一个小时内,收集数据并对其执行我的操作。一旦完成,每五分钟执行一次相同的操作。
我发现这可以用触发器实现,但我不确定应该使用哪个触发器以及如何使用。
更新:我不认为触发器有帮助,据我所知,它们只是定义每个窗口的触发时间/计数,而不是第一个窗口何时被触发。
我正在使用Flink的时间窗口功能来执行一些计算。我正在创建一个5分钟的窗口。不过,我想第一次创建一个小时的窗口。我需要的下一个窗口是5分钟。
在第一个小时内,收集数据并对其执行我的操作。一旦完成,每五分钟执行一次相同的操作。
我发现这可以用触发器实现,但我不确定应该使用哪个触发器以及如何使用。
更新:我不认为触发器有帮助,据我所知,它们只是定义每个窗口的触发时间/计数,而不是第一个窗口何时被触发。
实现这一点并不容易。
不管你是否记得一个自定义的窗口已经被激活了。
val stream: DataStream[(String, Int)] = ???
val result = stream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new YourTrigger())
.apply(new YourWindowFunction())
关于GlobalWindow和触发器的详细信息在Flink窗口文档中。