java-Apache Flink从Kafka读取Avro byte[]


0

在回顾这些例子时,我看到了很多这样的例子:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

我看到这里的人已经知道这个模式了。

在将字节[]读入通用记录之前,我不知道模式

有人能给我指一个FlinkKafkaConsumer08,它从字节[]读入 map过滤器,这样我就可以删除一些前导位,然后将这个字节[]加载到一个通用记录中吗?

2 答案


0

我也在做类似的事情(我使用的是09消费者)

在自定义反序列化程序的主代码传递中:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

自定义反序列化 schema读取字节,计算出 schema和/或从 schema注册表检索它,反序列化为GenericRecord并返回GenericRecord对象。

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


private final Class&lt;T&gt; avrotype = (Class&lt;T&gt;) org.apache.avro.generic.GenericRecord.class;

@Override
public T deserialize(byte[] arg0) throws IOException {
    //do your stuff here, strip off your bytes
    //deserialize and create your GenericRecord 
    return (T) (myavroevent);
}

@Override
public boolean isEndOfStream(T nextElement) {
    return false;
}

@Override
public TypeInformation&lt;T&gt; getProducedType() {
    return TypeExtractor.getForClass(avrotype);
}

}


0

如果您使用Confluent的schema registry,我相信一个首选的解决方案是使用Confluent提供的Avro serde。这样,我们只需调用deserialize()就可以在场景后面自动解析要使用的Avro模式的最新版本,而不需要字节操作。

归 root结底是这样的(scala中的示例代码,java解决方案非常类似):

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {

val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

KafkaKV(key, value)
}

...

此方法要求消息生产者也与模式注册表集成,并在那里发布模式。使用Confluent的KafkaAvroSerializer,可以用与上述非常相似的方式来完成

我在这里发布了一个详细的 explain:如何将Flink与Confluent的schema registry集成起来


我来回答

写文章

提问题

面试题