scala-从Flink的一个kafka话题开始


0

我如何确保我总是从一个 Kafka 的话题开始就消费 Flink?

作为Flink 1.0.2的一部分,Kafka 0.9.x消费者似乎不再是Kafka而是Flink来控制偏移:

Flink在内部快照偏移量,作为其

这就是我所做的,但是我的Flink程序总是从它停止的地方开始,并且不会像配置指示的那样返回开始:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")

val incomingData = env.addSource(
new FlinkKafkaConsumer09[IncomingDataRecord](
"my.topic.name",
new IncomingDataSchema,
props
)
)

2 答案


0

使用:

consumer.setStartFromEarliest();

0

我想你可以通过指定一个随机的组.id:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"

自动偏移复位只有在ZooKeeper中没有初始偏移时才有效。


我来回答

写文章

提问题

面试题