java-Flink反序列化Kafka-JSON


0

我试图用flink从kafka主题中读取json消息。

我用的是 Kafka 2.4.1和 Flink1.10

对于我的消费者,我设置了:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;


FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT,
new JSONKeyValueDeserializationSchema(false), properties);

当我使用SimpleStringSchema时,我将json作为文本获取,这很好,但是使用JSONKeyValueDeserializer我得到:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

sensor_5将是主题中的一个 key,我猜我需要添加一些其他东西来从kafka消息值中获取JSON并以某种方式处理该 key,但我不确定?

有什么建议吗?

json结构是:

{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}

并通过

# Python 3
import json
from confluent_kafka import Producer

dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})

producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))

2 答案


0

因此,基本上,如果您查看一下JSONKeyValueDeserializationSchema的源代码,可以看到它如下所示:

    if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                .put("offset", record.offset())
                .put("topic", record.topic())
                .put("partition", record.partition());
        }
        return node;

因此,通常模式希望您的密钥是JSON而不是字符串,因此对于sensor_5,它将失败。我认为最好最简单的解决方案是创建自己的以字符串为 key的实现。


0

如果不想在记录中包含 key,则可以实现反序列化 schema而不是KeyedDeserializationSchema。

示例如下:

public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {

private static final long serialVersionUID = -1L;

private ObjectMapper mapper;

@Override
public ObjectNode deserialize(byte[] message) throws IOException {
    if (mapper == null) {
        mapper = new ObjectMapper();
    }
    ObjectNode node = mapper.createObjectNode();
    if (message != null) {
        node.set("value", mapper.readValue(message, JsonNode.class));
    }
    return node;
}

@Override
public boolean isEndOfStream(ObjectNode nextElement) {
    return false;
}

@Override
public TypeInformation&lt;ObjectNode&gt; getProducedType() {
    return getForClass(ObjectNode.class);
}

}

如果您想在记录中也包括key,那么可以实现KeyedDeserializationSchema,正如dominikwosinski在回答中提到的那样。


我来回答

写文章

提问题

面试题