Apache Kafka -FlinkKafkaConsumer082自动偏移复位设置不起作用?


0

我有一个Flink streaming程序,从 Kafka 的一个主题中读取数据。在节目中,自动偏移复位设置为“最小”。在IDE/Intellij IDEA中进行 test时,程序总是可以从主题的开头读取数据。然后我建立了一个flink/kafka集群,并将一些数据生成kafka主题。第一次运行流式处理 job时,它可以从主题的开头读取数据。但之后我停止了流式处理 job并再次运行它,它将不会从主题的开头读取数据。如何使程序始终从主题的开头读取数据?

    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.offset.reset", offset);

DataStream<String> stream = env
        .addSource(new FlinkKafkaConsumer082<String>(topics, new SimpleStringSchema(), properties));

1 答案


0

如果希望始终从头开始读取,则需要在流上下文中禁用 checkpoints。

同时在使用者属性级别禁用它:

enable.auto.commit=false或者auto.commit.enable=假(取决于 Kafka 版本)

另一种方式:


我来回答

写文章

提问题

面试题