Apache Flink union运算符给出错误响应


0

我正在对两个通用记录类型的数据流应用union运算符。

package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkBroadcast {
public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    List<String> controlMessageList = new ArrayList<String>();
    controlMessageList.add("controlMessage1");
    controlMessageList.add("controlMessage2");

    List<String> dataMessageList = new ArrayList<String>();
    dataMessageList.add("Person1");
    dataMessageList.add("Person2");
    dataMessageList.add("Person3");
    dataMessageList.add("Person4");

    DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
    DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);

    DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
        @Override
        public GenericRecord map(String value) throws Exception {
             Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
             gr.put("TYPE", value);
             return gr;
        }
    });

    DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
        @Override
        public GenericRecord map(String value) throws Exception {
             Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
             gr.put("FIRSTNAME", value);
             gr.put("LASTNAME", value+": lastname");
             return gr;
        }
    });

    //Displaying Generic records
    dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
        @Override
        public GenericRecord map(GenericRecord value) throws Exception {
            System.out.println("data before union: "+ value);
            return value;
        }
    });

    controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
        @Override
        public GenericRecord map(GenericRecord value) throws Exception {
            System.out.println("data after union: " + value);
            return value;
        }
    });
    env.execute("stream");
}

}

输出:

05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED.

如您所见,在 union之后,dataMessageGenericRecordStream中的记录不正确。所有字段值都将被第一个字段值替换。

2 答案


0

我花了几天时间来调查另一个问题(但仍涉及GenericRecord),并找到了 root本原因和解决方案。

root本原因:在Apache Avro内部” schema.class“field”位置是一个瞬态,不会被Kryo序列化,因此在Flink pipeline 中反序列化时被初始化为位置“0”。

147Kyro特别提到了这一系列。

这是在Avro 1.7.7中修复的

解决方案:Flink必须使用Avro 1.7.7(或更高版本)。我已经通过替换flink-dist_2.11-1.1.3.jar中的Avro类来验证我的 local机器的修复,它纠正了我的问题。

我为此更新了JIRA问题:https://issues.apache.org/jira/browse/FLINK-5039

现在有一个公共关系:https://github.com/apache/flink/pull/2953

我希望它会包含在Flink1.1.4和1.2.0版本中。


0

我在 DataSet API中也遇到了类似的问题。我正在读一些Avro文件作为genericrocrds,看到了这种奇怪的行为。我使用了这个解决方法-我没有将它们作为一般记录来读取,而是将它们作为特定的记录(例如MyAvroObject)来读取,然后使用 map将它们 transformation/类型 transformation为通用记录。

我编写了一些代码来使用DataSetAPI test您的用例,它与上面的解决方法一起 job -

public static void maintest(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

List<String> queryList1 = new ArrayList<String>();
queryList1.add("query1");
queryList1.add("query2");

List<String> queryList2 = new ArrayList<String>();
queryList2.add("QUERY1");
queryList2.add("QUERY2");
queryList2.add("QUERY3");
queryList2.add("QUERY4");

DataSet<String> dataset1  = env.fromCollection(queryList1);
DataSet<String> dataset2  = env.fromCollection(queryList2);

DataSet<GenericRecord> genericDS1 = dataset1.map(new MapFunction<String, GenericRecord>() {
    @Override
    public GenericRecord map(String value) throws Exception {
        Query query = Query.newBuilder().setQuery(value).build();
        return (GenericRecord) query;
    }
});

DataSet<GenericRecord> genericDS2 = dataset2.map(new MapFunction<String, GenericRecord>() {
    @Override
    public GenericRecord map(String value) throws Exception {
        SearchEngineQuery searchEngineQuery = SearchEngineQuery.newBuilder().setSeQuery(value).build();
        return (GenericRecord) searchEngineQuery;
    }
});

genericDS2.map(new MapFunction<GenericRecord, GenericRecord>() {
    @Override
    public GenericRecord map(GenericRecord value) throws Exception {
        System.out.println("DEBUG: data before union: " + value);
        return value;
    }
});

genericDS1.union(genericDS2).map(new MapFunction<GenericRecord, GenericRecord>() {
    @Override
    public GenericRecord map(GenericRecord value) throws Exception {
        System.out.println("DEBUG: data after union: " + value);
        return value;
    }
}).print();

}

其中Query和SearchEngine Query是我的Avro对象(类似于控制消息列表和数据消息列表)。

输出:

{"query": "query1"}
{"se_query": "QUERY1"}
{"se_query": "QUERY3"}
{"query": "query2"}
{"se_query": "QUERY2"}
{"se_query": "QUERY4"}

我来回答

写文章

提问题

面试题