scala-Flink错误:通过字段位置指定 key只对元组数据类型有效


0

我使用的是Flink的scalaapi。我对reports=DataStream[Tuple15]进行了一些 transformation(Tuple15是一个Scala元组,所有字段都是Int)。问题在这里:

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
      (TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })

错误说明:

InvalidProgramException: Specifying keys via field positions is only valid for 
tuple data types. Type: GenericType<scala.Tuple5>

整个错误跟踪(我将指向错误的行标记为107行,对应于上面代码中的apply方法):

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
    at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

但这对我来说没有意义。我用的是元组类型,不是吗?或者GenericType<…>是怎么回事?

我该怎么修理 map 才能让钥匙 job 呢?

4 答案


0

原因是TypeInformation属于javaapi,因此不知道Scala元组。因此,generas不能与字段一起使用。

如果要手动生成Scala元组类型信息,必须使用createTypeInformation方法,该方法包含在org.apache.flink网站.api.scala/org网站.apache.flink.streaming.api. Scala包对象。

但是如果导入package对象,则不需要手动指定类型信息,因为TypeInformation是map操作的上下文绑定,createTypeInformation是隐式函数。

下面的代码片段显示了处理typeinformation的惯用方法。

import org.apache.flink.streaming.api.scala._

reports
.filter(.1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})


0

我也遇到了同样的问题,并能按如下方式解决:

使用Flink API中的Tuple2类,即[importorg.apache.flink网站.api.java.tuple.Tuple15]而不是scala.Tuple15

请查看导入部分并更正。

这里我使用了flinkjavaapi。对于Scala,导入org.apache.flink网站.api. Scala.u包装

[ Apache· Flink]


0

好吧,在花了很多时间之后,我只是简单地删除了字体信息。所以,改变这个:

.map( x => (x._3, x._4, x._5, x._7, x._8))(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)]))

为此:

.map( x => (x._3, x._4, x._5, x._7, x._8))

尽管如此,我认为这个解决方案是一种黑客攻击,因为我仍然收到来自Flink的警告(好吧,信息日志):

00:22:18,662 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple15 is not a valid POJO type
00:22:19,254 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple4 is not a valid POJO type

所以,如果有更普遍的答案,我很乐意接受。在那之前,这对我有效。

更新

我以前试过,但没用。我刚刚意识到,多亏了@Till的答案,它现在起作用了。所以,正如我所说的,你也需要进口org.apache.flink网站.流式处理.api.scala.createTypeInformation或org.apache.flink网站.api.scala.createTypeInformation(不是两个都是!)。


0

AggregateOperator只支持Flink元组。如果您面临这个问题,那么首先请检查您的进口是吗scala.Tuple2那就错了。应该是的org.apache.flink网站.api.java.tuple.元组2


我来回答

写文章

提问题

面试题