你能用泛型类型实现Flink的AggregateFunction吗?


0

我的目标是为Flink1.10中的流处理模块提供一个接口。 pipeline 中包含AggregateFunction等运算符。所有运算符都有泛型类型,但问题在于AggregateFunction中,该函数无法确定输出类型。

注意:实际的 pipeline 有一个slidingEventTimeWindow赋值器和一个与AggregateFunction一起传递的WindowFunction,但是使用下面的代码可以更容易地再现错误。

这是一个复制错误的简单 test用例:

    @Test
    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

    DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));

    ConfigAPI cfg = new ConfigAPI();

    source
            .keyBy(k -> k.f0)
            .countWindow(5, 1)
            .aggregate(new GenericAggregateFunc<>(cfg))
            .print();


    env.execute();
}

如您所见,配置类作为参数传递给自定义aggregateFunction。这是用户将实现的。

    public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
        @Override
        public Tuple2<String, Integer> createAcc() {
            return new Tuple2<>("0", 0);
        }

    @Override
    public Tuple2&lt;String, Integer&gt; addAccumulators(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
        acc.f1 += in.f1;
        return acc;
    }
}

提供的接口是:

    public interface BaseConfigAPI<In, Acc> {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override
    }

GenericAggregate函数:

    public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

    private BaseConfigAPI&lt;In, Acc&gt; cfg;
    GenericAggregateFunc(BaseConfigAPI&lt;In, Acc&gt; cfg) {
        this.cfg = cfg;
    }
    @Override
    public Acc createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Acc add(In in, Acc acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Acc getResult(Acc acc) {
        return acc;
    }
    @Override
    public Acc merge(Acc acc, Acc acc1) {
        return null;
    }
}

输出日志:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

解决方案1(不起作用):

.退货(类型.TUPLE(类型.STRING, 类型.INT))在.aggregate(…)之后,但没有成功。

解决方案2( job ):

这看起来不太优雅,而且与界面的其他部分也不太一致。这个问题还有其他解决办法吗?

编辑:感谢@dedupper为您提供的时间和见解,我想我找到了一个解决方案。

解决方案3( job ):我创建了一个新接口,它以以下方式扩展BaseConfigAPI和AggregateFunction:

public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
//These will be implemented directly from AggregateFunction interface
//Acc createAcc();
//Acc addAccumulators(In in, Acc acc);

//other methods to override

}

现在用户必须只实现MergedConfigAPI并将其作为参数传递给.aggregate(…)函数。

更新:我 root据框架 test了@dedupper的第三个解决方案,但它也不起作用。似乎异常是由Acc抛出的,而不是Out类型。仔细研究.aggregate运算符的内部结构,我发现有一个重载的聚合方法需要另外两个参数。一个TypeInformation累加器类型和一个TypeInformationreturnType。

这就是在没有任何代码重构的情况下出现的最简单的解决方案。

解决方案4( job ):

 @Test
 public void aggregateFunction_genericType() throws Exception {
                ...

            .aggregate(
                    new GenericAggregateFunc&lt;&gt;(cfg), 
                    Types.TUPLE(Types.STRING, Types.INT),
                    Types.TUPLE(Types.STRING, Types.INT))
            ...
}

注意:从Flink1.10.1开始,聚合方法用@publicEvolution进行了注释。

1 答案


0
·你能用泛型类型实现Flink的AggregateFunction吗?“

对。你可以。就像你自己做的那样。您的错误是由于您如何使用它(如“使用 site泛型”)而不是您如何实现它的结果。

·········这个问题还有其他解决办法吗?。。。“

我建议以下三个候选解决方案按简单性的升序排列…

...
source
       .keyBy(k -> k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
       .print();
...

上面的方法最简单,因为您不必重构原来的GenericAgregateFunc;只需在菱形中填充要实例化泛型类的特定类型参数。

还有另一个稍微不那么简单的解决方案…

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

private BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg;
GenericAggregateFunc(BaseConfigAPI&lt;Tuple2&lt;String, Integer&gt;, Tuple2&lt;String, Integer&gt;&gt; cfg) {
    this.cfg = cfg;
}
@Override
public Tuple2&lt;String, Integer&gt; createAccumulator() {
    return cfg.createAcc();
}
@Override
public Tuple2&lt;String, Integer&gt; add(Tuple2&lt;String, Integer&gt; in, Tuple2&lt;String, Integer&gt; acc) {
    return cfg.addAccumulators(in, acc);
}
@Override
public Tuple2&lt;String, Integer&gt; getResult(Tuple2&lt;String, Integer&gt; acc) {
    return acc;
}
@Override
public Tuple2&lt;String, Integer&gt; merge(Tuple2&lt;String, Integer&gt; acc, Tuple2&lt;String, Integer&gt; acc1) {
    return null;
}

}

虽然这一个涉及到一个小的重构,但在我看来,它比第一个提出的解决方案更简化了整个应用程序。

Flink已经为您处理了“复杂”的泛型多态性。要想插件到Flink,只需使用您想要实例化它的特定类型参数来实例化它们内置的泛型AggregateFunction。在您的例子中,那些类型参数是Tuple2类型。

所以你仍然在第二个解决方案中“使用泛型”,但是你用了一种更简单的方法。

另一个更接近原始实现的选项,但是有几个小的重构…

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {

...
@Override
public Out getResult(Acc acc) {
    return ...;
}
...

}

另外,要强制用户的配置实现与您的函数兼容的接口的前提条件…

public interface BaseConfigAPI< In, Acc, Out >{ ... }

在我的实验中,我确认了向BaseConfigAPI添加Out类型参数也可以使其兼容。

我心里确实有一个更复杂的替代方案。但是,既然简单总是更好,我将把更复杂的解决方案留给其他人提出。


我来回答

写文章

提问题

面试题