apache flink-kafka源流上的事件时间窗口


0

Kafka 服务器中有一个主题。在程序中,我们将此主题作为流读取并分配事件时间戳。然后在此流上执行窗口操作。但是这个程序不起作用。调试后,似乎没有执行WindowOperator的processWatermark方法。这是 我的代码。

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

            @Override
            public Tuple2&lt;String, Long&gt; map(String value) throws Exception {
                String[] splits = value.split(" ");
                return new Tuple2&lt;String, Long&gt;(splits[0], Long.parseLong(splits[1]));
            }
        }).assignTimestamps(timestampExtractor);

advertisement
        .keyBy(keySelector)
        .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
        .apply(new WindowFunction&lt;Tuple2&lt;String,Long&gt;, Integer, String, TimeWindow&gt;() {
            private static final long serialVersionUID = 5151607280638477891L;
            @Override
            public void apply(String s, TimeWindow window, Iterable&lt;Tuple2&lt;String, Long&gt;&gt; values, Collector&lt;Integer&gt; out) throws Exception {
                out.collect(Iterables.size(values));
            }
        }).print();

为什么会这样?如果我在“assignTimestamps(timestampExtractor)”之前添加“keyBy(keySelector)”,那么程序就可以运行了。有人能 explain一下原因吗?

1 答案


0

您受到Flink:Flink-3121中一个已知错误的影响: watermark转发对于不生成任何数据的源不起作用。

问题是FlinkKafkaConsumer的运行次数(很可能是CPU核数(比如4))比分区(1)多。 Kafka 的消费者中只有一个在发射 watermark,其他消费者则在空转。

窗口操作员并没有意识到这一点,而是等待来自所有消费者的 watermark到达。这就是为什么窗户从不触发。


我来回答

写文章

提问题

面试题