如何从Flink runner上的Google数据流(Apache Beam)向Kafka发送消息


0

我试着写一个概念证明,从 Kafka 那里获取信息,用光束在 Flink上变换,然后把结果推到另一个 Kafka 主题上。

我使用了kafkawindowedwordcount示例作为起点,这是我想做的第一部分,但是它输出到文本文件,而不是Kafka。FlinkKafkaProducer08看起来很有希望,但我不知道如何将它插入 pipeline 。我在想,它会被一个无限 chaining接槽,或一些这样的包装,但这似乎并不存在。

对我要做的事有什么建议或想法吗?

我正在运行最新的孵化器beam(截至昨晚从Github),集群模式下的flink1.0.0和Kafka 0.9.0.1,所有这些都在Google计算引擎上(Debian Jessie)。

2 答案


0

2016年,Apache Beam/Dataflow中添加了用于写入Kafka的Sink transformation。有关用法示例,请参见apachebeam中KafkaIO的JavaDoc。


0

Beam中当前没有无界Sink类。大多数无边界 sink都是使用ParDo实现的。

您可能希望查看 Kafka 约 join器。这是一个Kafka读取器,在所有Beam runner中 job ,并实现并行读取、 checkpoints和其他无限源api。这个pull请求在TopHashtags示例 pipeline 中还包含一个原始 sink,通过在ParDo中向Kafka写入:

class KafkaWriter extends DoFn<String, Void> {

private final String topic;
private final Map<String, Object> config;
private transient KafkaProducer<String, String> producer = null;

public KafkaWriter(Options options) {
this.topic = options.getOutputTopic();
this.config = ImmutableMap.<String, Object>of(
"bootstrap.servers", options.getBootstrapServers(),
"key.serializer", StringSerializer.class.getName(),
"value.serializer", StringSerializer.class.getName());
}

@Override
public void startBundle(Context c) throws Exception {
if (producer == null) { // in Beam, startBundle might be called multiple times.
producer = new KafkaProducer<String, String>(config);
}
}

@Override
public void finishBundle(Context c) throws Exception {
producer.close();
}

@Override
public void processElement(ProcessContext ctx) throws Exception {
producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
}
}

当然,我们也希望在KafkaIO中添加sink支持。它实际上和上面的KafkaWriter一样,但使用起来要简单得多。


我来回答

写文章

提问题

面试题