apachekafka-Flink数据流 transformation和 公开到REST端点


0

我有springboot应用程序并与apacheflink集成。我想从Kafka系统读取数据,并将它们 公开到REST端点。

下面是我的简单数据,

    @GetMapping("/details/{personName}")
    public String getPersonDetails() throws Exception {

    StreamExecutionEnvironment env =  LocalStreamEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "group_id");


    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
            new SimpleStringSchema(), properties);
    consumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(consumer);

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = 1L;
        @Override
        public String map(String value) throws Exception {
            logger.info(value);
            return value;
        }
    }).print();
    env.execute();

    return "hello world";

}

我的问题是,

    我的Kafka返回如下字符串值,

"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}

如何通过应用过滤器 transformation为JSON返回。例如,如果REST调用的输入是“John”,我希望对它们进行分组并求和权重值,然后返回为JSON(只有名称和权重)。

    第二个问题,

NFR:

我在 Kafka 有大量的数据,所以想扩展和快速处理。

1 答案


0

听起来你可能需要花更多的时间来查看Flink文档。但总而言之。。。

    添加一个MapFunction,该函数将字符串解析为JSON,提取名称和权重,并将其输出为Tuple2或某些自定义Java类。

我来回答

写文章

提问题

面试题