Flink InvalidTypesException:无法确定“class”中TypeVariable“K”的类型


0

Flink0.10.0最近刚刚发布。我有一些代码需要从0.9.1迁移过来。但出现以下错误:

网址:apache.flink.api.common.functions函数.InvalidTypesException:类“”中TypeVariable“”K“”的类型fi.aalto.dmg公司.frame.FlinkPairWorkloadOperator文件“无法确定。这很可能是类型擦除问题。类型提取当前仅在返回类型中的所有变量都可以从输入类型中推导出来的情况下支持具有泛型变量的类型。

代码如下:

 public class FlinkPairWorkloadOperator<K,V> implements PairWorkloadOperator<K,V> {

private DataStream&lt;Tuple2&lt;K, V&gt;&gt; dataStream;

public FlinkPairWorkloadOperator(DataStream&lt;Tuple2&lt;K, V&gt;&gt; dataStream1) {
    this.dataStream = dataStream1;
}



public FlinkGroupedWorkloadOperator&lt;K, V&gt; groupByKey() {
    KeyedStream&lt;Tuple2&lt;K, V&gt;, K&gt; keyedStream = this.dataStream.keyBy(new KeySelector&lt;Tuple2&lt;K, V&gt;, K&gt;() {
        @Override
        public K getKey(Tuple2&lt;K, V&gt; value) throws Exception {
            return value._1();
        }
    });
    return new FlinkGroupedWorkloadOperator&lt;&gt;(keyedStream);
}

}

为了理解InvalidTypesException是如何发生的,我有另一个例子,它也抛出了这个异常,我对此一无所知。在这个演示中,程序与扇形图2,但不是flink Tuple2。

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream&lt;String&gt; counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter());

    DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; pairs = mapToPair(counts, mapToStringIntegerPair);
    pairs.print();
    env.execute("Socket Stream WordCount");
}

public static class Splitter implements FlatMapFunction&lt;String, String&gt; {
    @Override
    public void flatMap(String sentence, Collector&lt;String&gt; out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(word);
        }
    }
}

public static  &lt;K,V,T&gt; DataStream&lt;Tuple2&lt;K,V&gt;&gt; mapToPair(DataStream&lt;T&gt; dataStream , final MapPairFunction&lt;T, K, V&gt; fun){
    return dataStream.map(new MapFunction&lt;T, Tuple2&lt;K, V&gt;&gt;() {
        @Override
        public Tuple2&lt;K, V&gt; map(T t) throws Exception {
            return fun.mapPair(t);
        }
    });
}

public interface MapPairFunction<T, K, V> extends Serializable {
Tuple2<K,V> mapPair(T t);
}

public static MapPairFunction<String, String, Integer> mapToStringIntegerPair = new MapPairFunction<String, String, Integer>() {
public Tuple2<String, Integer> mapPair(String s) {
return new Tuple2<String, Integer>(s, 1);
}
};
}

1 答案


0

问题是你用的是扇形图2而不是网址:apache.flink.api.java.tuple.Tuple2与Flink的Java API结合使用。javaapi的TypeExtractor不理解Scala元组。因此,它无法提取类型变量K的类型。

如果你使用网址:apache.flink.api.java.tuple.Tuple2,则TypeExtractor将能够解析类型变量。


我来回答

写文章

提问题

面试题