java—有什么方法可以确保所有的 checkpoints侦听器都能在Flink on job cancel with savepoint时得到 checkpoints完成的通知?


0

我使用flink1.9和restapi/jobs/:jobid/savepoints来触发保存点并取消 job(优雅地停止 job以便稍后从savepoint运行)。

我在源函数中使用两阶段提交,所以我的源代码实现了CheckpointedFunction和CheckpointListener接口。在snapshotState()方法调用时,我快照内部状态,在notifyCheckpointComplete()时,我将 checkpoints状态发送给第三方系统。

从源代码中我可以看到,只有snapshotState()部分在CheckpointCoordinator中是同步的-

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

checkpoints确认和完成通知在AsyncCheckpointRunnable中是异步的。

也就是说,当cancel job设置为true的savepoint被触发时,在获取快照之后,一些任务管理器会在 job取消之前继续接收完成通知并执行notifyCheckpointComplete(),而有些则没有。

问题是,是否有一种方法可以取消带有savepoint的 job,从而保证在 job取消之前,所有任务管理器都会调用notifyCheckpointComplete(),还是目前还没有办法做到这一点?

2 答案


0

使用stop with savepoint[1][2]不会解决问题吗?

[1]https://ci.apache.org/projects/flink/flink docs stable/monitoring/rest_api.htmljobs- jobID停止


0

我已经有一段时间没看Flink1.9了,所以请谨慎地接受我的回答。

我猜你的消息来源取消得太早了。所以notifyCheckpointComplete实际上被发送到所有任务,但是一些sourcefunction已经退出了运行,相应的任务也被清理了。

Afaik,如果您在收到最后一个notifyCheckpointComplete之前忽略取消和中断,那么您所描述的应该是可能的。

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    pendingCheckpoint = true;
    // start two-phase commit
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
    // finish two-phase commit
    pendingCheckpoint = false;
}

@Override
public void run(SourceContext&lt;Object&gt; ctx) throws Exception {
    while (!canceled) {
        // do normal source stuff
    }
    // keep the task running after cancellation
    while (pendingCheckpoint) {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            // ignore interruptions until two-phase commit is done
        }
    }
}

@Override
public void cancel() {
    canceled = true;
}

}


我来回答

写文章

提问题

面试题