在 使用protobuf


0

我用flink读取 Kafka 的数据并将其 transformation成protobuf。我面临的问题是,当我运行java应用程序时,我得到了以下错误。如果我将unknownFields变量名修改为其他名称,它可以 job ,但很难对所有protobuf类进行此更改。

在从kafka读取时,我也尝试直接反序列化,但是我不确定getProducedType()方法应该返回什么类型的TypeInformation。

    public static class ProtoDeserializer implements DeserializationSchema{

@Override
public TypeInformation getProducedType() {
    // TODO Auto-generated method stub
    return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
}

感谢所有的帮助。谢谢。

java.lang.RuntimeException:受保护的字段com.google.protobuf.未知字段集com.google.protobuf.GeneratedMessage.未知字段已包含在类的层次结构中com.google.protobuf.生成消息。请在类层次结构中使用唯一的字段名

代码:

    FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);

DataStream&lt;byte[]&gt; input = env.addSource(kafkaConsumer);
DataStream&lt;PBAddress&gt; protoData = input.map(new RichMapFunction&lt;byte[], PBAddress&gt;() {
    @Override
    public PBAddress map(byte[] value) throws Exception {
        PBAddress addr = PBAddress.parseFrom(value);
        return addr;
    }
});

3 答案


0

试试这个:

public class ProtoDeserializer implements DeserializationSchema<PBAddress> {
    @Override
    public TypeInformation<PBAddress> getProducedType() {
        return TypeInformation.of(PBAddress.class);
    }

0

也许你应该试试下面的方法:

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,ProtobufSerializer.class);

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,PBAddressSerializer.class);

public class PBAddressSerializer extends Serializer<Message> {
final private Map<Class,Method> hashMap = new HashMap<Class, Method>();

protected Method getParse(Class cls) throws NoSuchMethodException {
Method method = hashMap.get(cls);
if (method == null) {
method = cls.getMethod("parseFrom",new Class[]{byte[].class});
hashMap.put(cls,method);
}

return method;

}

@Override
public void write(Kryo kryo, Output output, Message message) {
byte[] ser = message.toByteArray();
output.writeInt(ser.length,true);
output.writeBytes(ser);

}

@Override
public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
try {
int size = input.readInt(true);
byte[] barr = new byte[size];
input.read(barr);
return (Message) getParse(pbClass).invoke(null,barr);
} catch (Exception e) {
throw new RuntimeException("Could not create " + pbClass, e);
}

}
}


0

https://issues.apache.org/jira/browse/FLINK-11333JIRA票据跟踪的问题是实现对Protobuf类型的一级支持和演化模式。您将看到,在相当长的一段时间之前有一个拉请求,该请求尚未 coalesce 。我认为问题在于,在Protobuf以前通过向Kryo注册的情况下,没有支持处理状态迁移。

同时,Stateful Functions项目(statefun是一个在Flink之上运行的新API)完全基于Protobuf,它包括对Protobuf与Flink一起使用的支持:https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf。(该包的入口点是ProtobufTypeInformation.java)我建议探索这个包(它不包括statefun特定的东西);但是,它本身也不涉及从Kryo的迁移。


我来回答

写文章

提问题

面试题