我写了一个简单的程序来读取 Kafka 的数据并用flink打印出来。下面是代码。
public static void main(String[] args) throws Exception {
Options flinkPipelineOptions = PipelineOptionsFactory.create().as(Options.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
flinkPipelineOptions.setJobName("MyFlinkTest");
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setCheckpointingInterval(1000L);
flinkPipelineOptions.setNumberOfExecutionRetries(5);
flinkPipelineOptions.setExecutionRetryDelay(3000L);
Properties p = new Properties();
p.setProperty("zookeeper.connect", "localhost:2181");
p.setProperty("bootstrap.servers", "localhost:9092");
p.setProperty("group.id", "test");
FlinkKafkaConsumer09<Notification> kafkaConsumer = new FlinkKafkaConsumer09<>("testFlink",new ProtoDeserializer(),p);
DataStream<Notification> input = env.addSource(kafkaConsumer);
input.rebalance().map(new MapFunction<Notification, String>() {
@Override
public String map(Notification value) throws Exception {
return "Kafka and Flink says: " + value.toString();
}
}).print();
env.execute();
}
我需要 Flink在 Kafka 准确地处理我的数据一次,我对如何做到这一点几乎没有疑问。
- FlinkKafkaConsumer09何时向kafka提交已处理的偏移量?
请指教。感谢所有的帮助。谢谢。