java-FlinkKafkaConsumer09反复读取一些消息


0

我写了一个简单的程序来读取 Kafka 的数据并用flink打印出来。下面是代码。

public static void main(String[] args) throws Exception {

Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);

flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);

Properties p = new Properties();
p.setProperty("zookeeper.connect", "localhost:2181");
p.setProperty("bootstrap.servers", "localhost:9092");
p.setProperty("group.id", "test");

FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p);

DataStream<Notification> input = env.addSource(kafkaConsumer);

input.rebalance().map(new MapFunction<Notification, String>() {
    @Override
    public String map(Notification value) throws Exception {
        return "Kafka and Flink says: " + value.toString();
    }

}).print();

env.execute();

}

我需要 Flink在 Kafka 准确地处理我的数据一次,我对如何做到这一点几乎没有疑问。

    FlinkKafkaConsumer09何时向kafka提交已处理的偏移量?

请指教。感谢所有的帮助。谢谢。

1 答案


0

本页介绍Flink-Kafka join器的容错保证。

您可以使用Flink的savepoints以仅一次(保存状态)的方式重新启动 job。

您再次看到这些消息的原因是因为Flink提交给Kafka代理/Zookeeper的偏移量与Flink的注册状态不一致。

有点离题:这些台词是用来干什么的?他们不会被传给任何地方。

Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);
flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);

我来回答

写文章

提问题

面试题