scala-无法将()自定义函数应用于Flink上的WindowedStream


0

我一直在尝试为窗口的apply()方法编写自定义逻辑。基本上,我想减少一个窗口中的所有元素,然后在该值上附加一个时间戳,因此我从一个数据流创建了一个WindowedStream,但是当我试图为apply()定义函数时,它在编译时失败了。

代码如下:

class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
  override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
    (a._1, a._2, a._3 + b._3)
  }
}

class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
for(row <- in) {
out.collect((row._1, row._2, row._3, window.maxTimestamp()))
}
}
}

数据流的类型是[Int,String,Int], key是[Int,String]。不带apply()的代码运行和编译时不会出错,但当我 key入:

myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())

当它失败并且无法编译时,给出错误:

[ERROR]   [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR]   [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR]       .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR]        ^
[ERROR] one error found

1 答案


0

您使用的是索引位置 key,如keyBy(1)或字段表达式 key,如keyBy(“field”)。这意味着WindowedStream的 key类型是Tuple类型(org.apache.flink网站.api.java.tuple.Tuple指定)。

如果您将WindowFunction的第三个泛型参数从(Int,String)改为Tuple,它应该可以 job 。还可以更改keyBy调用以使用lambda函数,然后可以在WindowedStream中获得正确的特定 key类型。例如:keyBy(in=>(in.u 1,in.u 2)。


我来回答

写文章

提问题

面试题