Flink+Kafka重置 checkpoints和偏移量


0

简而言之,我想从一开始就重新运行一个关于 Kafka 数据的Flink pipeline 。

Flink0.10.2, Kafka 0.8.2。

我在 Kafka 有一个tweets主题,保留2小时,在Flink有一个 pipeline ,每10秒有5分钟的滑动窗口来计算tweets。

如果我中断 pipeline 并重新运行它,我希望它重新 读取旧的tweet,从而发出5分钟的tweet。

我两个都试过了自动偏移复位=最小/最早,并更改组id,但没有成功。我还尝试手动更改 Kafka 中的偏移量,如下所述:https://metabroadcast.com/blog/reseting-kafka-offsets

然后我假设这个问题可能与Flink的 checkpoints有关,但是我不知道/找不到关于如何重置这个问题的信息。

有人能分享一些 job 代码吗?谢谢,E。

1 答案


0

重新 读取 Kafka 主题中的所有内容,设置新的组id“以及”自动偏移复位到“最早”就足够了。

如果这不起作用,那就有问题了。


我来回答

写文章

提问题

面试题