在Flink1.9.1下用confluent registry序列化Kafka消息


0

是否可以通过融合将消息以KafkaAvroSerializer序列化的方式发布到Kafka。我用的是Flink1.9.1,我看到了FlinkAvro(1.11.0)的更新版本正在进行一些开发,但我还是坚持这个版本。

我想使用新引入的KafkaSerializationSchema将消息序列化到合流的schemaregistry和Kakfa。

这里我有一个类,它正在将类类型T transformation为avro,但我想使用合流序列化。

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);

final private String topic;

public KafkaMessageSerialization(String topic) {
    this.topic = topic;
}

@Override
public ProducerRecord&lt;byte[], byte[]&gt; serialize(T event, Long timestamp) {
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final Schema schema = event.getSchema();
    final DatumWriter&lt;T&gt; writer = new ReflectDatumWriter&lt;&gt;(schema);
    final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        writer.write(event, binEncoder);
        binEncoder.flush();
    } catch (final Exception e) {
        LOG.error("serialization error", e);
        throw new RuntimeException(e);
    }

    return new ProducerRecord&lt;&gt;(topic, outputStream.toByteArray());
}

}

使用非常方便,addSink(new FlinkKafkaProducer<>(SINK_TOPIC,new KafkaMessageSerialization<>(SINK_TOPIC),producerProps,语义。至少一次))

1 答案


0

我也处在同样的情况下,基于你的解决方案,我写了这门课。我用Flink1.10.1 test过它。

public class ConfluentAvroMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {

public static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ConfluentAvroMessageSerialization.class);

final private String topic;
final private int schemaId;
final private int magicByte;

public ConfluentAvroMessageSerialization(String topic, String schemaRegistryUrl) throws IOException, RestClientException {
    magicByte = 0;
    this.topic = topic;

    SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
    SchemaMetadata schemaMetadata = schemaRegistry.getLatestSchemaMetadata(topic + "-value");
    schemaId = schemaMetadata.getId();

    LOG.info("Confluent Schema ID {} for topic {} found", schemaId, topic);
}

@Override
public ProducerRecord&lt;byte[], byte[]&gt; serialize(T event, Long timestamp) {
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final Schema schema = event.getSchema();
    final DatumWriter&lt;T&gt; writer = new ReflectDatumWriter&lt;&gt;(schema);
    final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(schemaId).array();
        outputStream.write(magicByte); // Confluent Magic Byte
        outputStream.write(schemaIdBytes); // Confluent Schema ID (4 Byte Format)
        writer.write(event, binEncoder); // Avro data
        binEncoder.flush();
    } catch (final Exception e) {
        LOG.error("Schema Registry Serialization Error", e);
        throw new RuntimeException(e);
    }

    return new ProducerRecord&lt;&gt;(topic, outputStream.toByteArray());
}

}

Confluent有一个带有幻方字节和模式id(4字节)的适当格式。更多信息请查看https://docs.confluent.io/current/schema registry/serdes develop/index.htmlwire-格式


我来回答

写文章

提问题

面试题