Apache· Flink· Kafka 消费者问题


0

我在Kafka中有数据,我想读取Kafka是否发送数据的数据,并对其进行过滤并返回JSON。

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", "localhost:9092");

    properties.setProperty("group.id", "flink_consumer");


    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
            new SimpleStringSchema(), properties);
    consumer.setStartFromLatest();
    //config.setWriteTimestampToKafka(true);

    DataStream<String> stream = env.addSource(consumer);

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = 1L;
        @Override
        public String map(String value) throws Exception {

            return "Stream Value: " + value;
        }
    }).print();
    env.execute();

案例1:当Kafka生产商将数据发送到Kafka时,我可以在控制台中看到值打印。-很好,很好。

你知道我哪里出错了吗?

{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}

我想要filter firstname并返回JSON。

我看到在数据流处理过程中有过滤器选项。

1 答案


0

如果你想从第一条消息开始,你应该设置consumer.setStartFromEarly();. 它将从第一条未经确认的信息开始 读取。


我来回答

写文章

提问题

面试题