如何在纯java中设置flink并行(IDEA)


0

我使用下面的scala代码来运行我的flink streaming job

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params)).setParallelism(1)
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

当我使用flinkrun-p1时,并行度是1(不知道-p是 job 的还是代码 job 的)。当我使用纯java运行时(在思想上我假设它运行在纯java中),并行度通常是5,这表明我的代码不能 job 。如何控制?

正如上面的答案所示,下面的代码也不起作用,还有5的平行性。

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params))
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

1 答案


0

您可以在环境上设置默认的并行性。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(...)

使用.addSink(new mySink(params)).setParallelism(1)重写特定运算符的默认并行度。


我来回答

写文章

提问题

面试题