Scala-flink.api.表格.TableException:不支持类型


0

我尝试在scala中使用flinktableapi。编译时间没有错误,但当我在flink集群中运行 job时:flink.api.表格.TableException:不支持类型:

我的maven依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

我的进口:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.{Row, Table, TableEnvironment}

我的代码:

// odo[(Long,String,Double,Long)]
val inputTable = odo.toTable(tableEnv,'ts,'ty, 'vl, 'dv)
val resultStream: Table = inputTable.where('ty === "Odometer").select('dv)
resultStream.toDataStream[Row].print

更新:我想可能是关于Flink版本(1.0.3),因为当我做这样的事情时:

val inputTable = odo.toTable(tableEnv, 'ts, 'ty, 'vl, 'dv)
val result = inputTable.select('dv,'vl.sum).where('dv == 111)
result.toDataStream[Row].print()

我还有一个例外:org.apache.flink网站.api.table.TableException异常:当前不支持流表上的聚合。

感谢任何帮助。非常感谢。

1 答案


0

Flink的表API不支持在1.1-SNAPSHOT中包含GenericType的字段。有一个Pull请求实现了这个特性。它很可能包含在Flink1.1版本中。

关于你的第二个例外:这个例外基本上是自我 explain的。到目前为止,您还不能对流进行聚合。然而,StreamSQL正在开发中。


我来回答

写文章

提问题

面试题