Flink streaming事件时间窗口排序


0

我在理解事件时间窗口的语义时遇到了一些困难。下面的程序生成一些具有时间戳的元组,这些元组用作事件时间,并执行简单的窗口聚合。我希望输出与输入的顺序相同,但输出的顺序不同。为什么输出相对于事件时间是无序的?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.enableTimestamps()
env.setParallelism(1)

val start = 1449597577379L
val tuples = (1 to 10).map(t => (start + t * 1000, t))

env.fromCollection(tuples)
  .assignAscendingTimestamps(_._1)
  .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
  .sum(1)
  .print()

env.execute()

}

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)

1 答案


0

这种行为的原因是在Flink中元素的顺序(相对于时间戳)没有被考虑在内。只有 watermark的正确性及其与元素时间戳的关系对于考虑时间的操作才是重要的,因为在基于时间的操作中, watermark通常会触发计算。

在您的示例中,窗口操作符将源中的所有元素存储在内部窗口缓冲区中。然后,源代码发出一个 watermark,表示将来不会有时间戳更小的元素到达。这又告诉窗口操作符处理结束时间戳低于 watermark的所有窗口(对于所有窗口都是如此)。因此,它发出所有窗口(任意顺序),然后它自己发出 watermark。从这个操作的下游操作本身将接收元素,并且一旦接收到 watermark就可以进行处理。

默认情况下,从源发射 watermark的间隔为200毫秒。由于源发射的元素数量较少,所有这些元素都在第一个 watermark发射之前发射。在一个真实的用例中,如果 watermark发射间隔比窗口大小小得多,那么您将得到按时间戳顺序发射的窗口的预期行为。例如,如果每500毫秒有1小时的窗口和 watermark。


我来回答

写文章

提问题

面试题