Beam中当前没有无界Sink类。大多数无边界 sink都是使用ParDo实现的。
您可能希望查看 Kafka 约 join器。这是一个Kafka读取器,在所有Beam runner中 job ,并实现并行读取、 checkpoints和其他无限源api。这个pull请求在TopHashtags示例 pipeline 中还包含一个原始 sink,通过在ParDo中向Kafka写入:
class KafkaWriter extends DoFn<String, Void> {
private final String topic;
private final Map<String, Object> config;
private transient KafkaProducer<String, String> producer = null;
public KafkaWriter(Options options) {
this.topic = options.getOutputTopic();
this.config = ImmutableMap.<String, Object>of(
"bootstrap.servers", options.getBootstrapServers(),
"key.serializer", StringSerializer.class.getName(),
"value.serializer", StringSerializer.class.getName());
}
@Override
public void startBundle(Context c) throws Exception {
if (producer == null) { // in Beam, startBundle might be called multiple times.
producer = new KafkaProducer<String, String>(config);
}
}
@Override
public void finishBundle(Context c) throws Exception {
producer.close();
}
@Override
public void processElement(ProcessContext ctx) throws Exception {
producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
}
}
当然,我们也希望在KafkaIO中添加sink支持。它实际上和上面的KafkaWriter一样,但使用起来要简单得多。