Flink: DataSet 和数据流API在一个程序中。有可能吗?


0

我想首先使用datasetapi操作静态数据,然后使用datastreamapi来运行 streaming job。如果我在IDE上编写代码,它会完美地 job 。但是当我尝试在 localflinkjobmanager(allparallelishm1)上运行时,流代码永远不会执行!

例如,以下代码不起作用:

val parallelism = 1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)

val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)

val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head

val theStream = env.fromElements(1).iterate( iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")

我该怎么做才能让它 job ?

日志:执行日志对于上述程序

执行计划:计划

1 答案


0

如果您的Flink job包含多个子 job,例如由count、collect或print触发,则无法通过web界面提交 job。web界面只支持一个Flink job。


我来回答

写文章

提问题

面试题