从FasterXML读取值时发生Flink Collector问题


0

我有Kafka值作为字符串,POJO如下所示,

{"name":"John","timeStamp":"2020-08-11T13:31:31"}
class Person{

private String name;    

private LocalDateTime timeStamp; 

}

这个时间戳以来自Kafka的字符串形式出现,并将它们 transformation为LocalDateTime。

当我独立运行程序时objectMapper.readValue(价值,人.类)使用来自FasterXML的必需库,它可以正常 job 。它正在转变。

当我从Flink框架中读到以下内容时,

 stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print();
        env.execute();

我得到了下面的问题,

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.

消息显示Person对象不可序列化,我已经为Person类实现了serializable,但运气不好。还有,在下面试过了,运气也不好。

@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp; 

更新:

看起来API有问题,我在下面的 chaining接中看到,

https://issues.apache.org/jira/browse/FLINK-12113

1 答案


0

这个异常声明JsonMapper实例是不可序列化的—如果我没搞错的话,它已经从2.1版开始被设置为可序列化的。另外,Person类也应该是可序列化的。

所以,在您的例子中,我建议您要么切换到jackson databind版本>=2.1,要么将JsonMapper设为静态字段。

如果是Person类,只需实现可序列化接口:

class Person implements Serializable {

private String name;    

private LocalDateTime timeStamp; 

}


我来回答

写文章

提问题

面试题