如何在Flink中获取和修改jobgraph?


0

我想修改jobGraph,比如getVertices()、ColocationGroup(),我尝试了以下操作:

    val s = StreamExecutionEnvironment.getExecutionEnvironment
    val p = new Param()
    s.addSource(new MySource(p))
      .map(new MyMap(p))
      .addSink(new MySink(p))
    val g = s.getStreamGraph.getJobGraph()
    val v = g.getVertices()
    s.executeAsync()

但在调试模式下,顶点不是我的MySource操作符,我很困惑getStreamGraph可以接受JobID, job在开始之前怎么会有id…,所以我想也许我应该在执行之后得到它,然后我将代码改为如下:,

    val s = StreamExecutionEnvironment.getExecutionEnvironment
    val p = new Param()
    s.addSource(new MySource(p))
      .map(new MyMap(p))
      .addSink(new MySink(p))
    s.executeAsync()
    val g = s.getStreamGraph.getJobGraph()
    val v = g.getVertices()

但是代码被卡在s中了。getStreamGraph.getJobGraph()

如何得到 job 图呢?。。。

1 答案


0

无法修改 job图。各种api构造JobGraph,JobGraph随后到达JobManager,JobManager将其 transformation为执行图,然后在任务管理器提供的任务槽中调度和运行。没有API允许您在 job的拓扑结构建立之后修改它。(这并不是不可想象的,有一天它会得到支持,但现在不可能了。)

如果你只是想看看它的表现,System.out.println(环境获取执行计划())很有趣。

如果您希望从静态DAG中获得更多的动态性,那么有状态函数API要灵活得多。


我来回答

写文章

提问题

面试题