java-Flink:Flink是否支持抽象运算符,它可以处理具有公共字段的不同数据流?


0

假设我们有多个数据流,它们共享一些共同的特性。

例如,我们有一个教师流和一个学生流,他们都有一个年龄字段。如果我想从实时流中找出最年长的学生或老师,我可以实现如下运算符。

public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
    int maxAge;

@Override
public void flatMap(Student s, Collector&lt;Integer&gt; collector) throws Exception {
    if(s.age &gt; maxAge){
        maxAge = s.age;
    }
    collector.collect(maxAge);
}

}

为了找出最年长的老师,我们需要实现一个类似的运算符,如下所示

public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
    int maxAge;

@Override
public void flatMap(Teacher t, Collector&lt;Integer&gt; collector) throws Exception {
    if(t.age &gt; maxAge){
        maxAge = t.age;
    }
    collector.collect(maxAge);
}

}

但实际上这两个操作符有共同的过程逻辑,所以我的想法是定义一个父类,比如People。

public class People{
    public Integer age;
}

这样,学生和教师就可以被定义为自己的子类,并保留各自的领域。

public class Student extends People {
    public Integer grade;  // student grade
    ...
}
public class Student extends People {
    public Integer subject;  // the subject that teacher teaches
    ...
}

在这种情况下,我可以定义如下运算符。

public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
    int maxAge;

@Override
public void flatMap(People p, Collector&lt;Integer&gt; collector) throws Exception {
    if(t.age &gt; maxAge){
        maxAge = p.age;
    }
    collector.collect(maxAge);
}

}

但是当我试图使用这个操作符来实现Flink执行拓扑时,由于数据类型不匹配,它将无法 job 。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);

studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();

这就是我的问题,有没有可能为具有公共字段的输入流生成一个抽象运算符?

1 答案


0

这与其说是一个回避问题,不如说是一个Java问题:

您要做的是使MaxiumAgeFunc参数化如下

public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
    int maxAge;

@Override
public void flatMap(T p, Collector&lt;Integer&gt; collector) throws Exception {
    if(t.age &gt; maxAge){
        maxAge = p.age;
    }
    collector.collect(maxAge);
}

}

然后像这样使用它

studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();

编辑:

顺便说一句,您的函数不能处理 checkpoints(因此在从 checkpoints恢复时会产生错误的结果),我宁愿使用全局窗口上的聚合函数。

students
    .windowAll(GlobalWindows.create())
    .aggregate(new AggregateFunction<People, Integer, Integer>() {
        @Override
        public Integer createAccumulator() {
            return -1;
        }

    @Override
    public Integer add(People value, Integer accumulator) {
        return Math.max(value.age, accumulator);
    }

    @Override
    public Integer getResult(Integer accumulator) {
        return accumulator;
    }

    @Override
    public Integer merge(Integer a, Integer b) {
        return Math.max(a, b);
    }
});


我来回答

写文章

提问题

面试题