简而言之,我想从一开始就重新运行一个关于 Kafka 数据的Flink pipeline 。
Flink0.10.2, Kafka 0.8.2。
我在 Kafka 有一个tweets主题,保留2小时,在Flink有一个 pipeline ,每10秒有5分钟的滑动窗口来计算tweets。
如果我中断 pipeline 并重新运行它,我希望它重新 读取旧的tweet,从而发出5分钟的tweet。
我两个都试过了自动偏移复位=最小/最早,并更改组id,但没有成功。我还尝试手动更改 Kafka 中的偏移量,如下所述:https://metabroadcast.com/blog/reseting-kafka-offsets
然后我假设这个问题可能与Flink的 checkpoints有关,但是我不知道/找不到关于如何重置这个问题的信息。
有人能分享一些 job 代码吗?谢谢,E。