请确认这是使用Flink将数据流传输到Hadoop的正确方法


0

我需要 Flink的帮助。我在下面生成了一个简单的helloworld类型的代码。这将流式传输来自RabbitMQ的Avro消息并将其 persist化到 HDFSS。我希望有人可以检查代码,也许它可以帮助其他人。

我找到的Flink streaming的大多数例子都会将结果发送到std。实际上我想把数据保存到Hadoop中。我读到,理论上,你可以和 Flink一起去任何你喜欢的地方。实际上,我还没有找到任何将数据保存到 HDFSS的例子。但是,基于我所找到的例子,以及试验和错误,我提供了下面的代码。

这里的数据源是RabbitMQ。我使用 client应用程序将“MyAvroObjects”发送到RabbitMQ。MyAvroObject.java文件-不包括-由avro IDL生成。。。可以是任何avro消息。

下面的代码使用RabbitMQ消息,并将其作为avro文件保存到 HDFSS。。。好吧,我希望如此。

package com.johanw.flink.stackoverflow;

import java.io.IOException;

import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQToHadoop {
public class MyDeserializationSchema implements DeserializationSchema<MyAvroObject> {
private static final long serialVersionUID = 1L;

    @Override
    public TypeInformation&lt;MyAvroObject&gt; getProducedType() {
         return TypeExtractor.getForClass(MyAvroObject.class);
    }

    @Override
    public MyAvroObject deserialize(byte[] array) throws IOException {
        SpecificDatumReader&lt;MyAvroObject&gt; reader = new SpecificDatumReader&lt;MyAvroObject&gt;(MyAvroObject.getClassSchema());
        Decoder decoder = DecoderFactory.get().binaryDecoder(array, null);
        MyAvroObject MyAvroObject = reader.read(null, decoder);
        return MyAvroObject;
    }

    @Override
    public boolean isEndOfStream(MyAvroObject arg0) {
        return false;
    }
}

private String hostName;
private String queueName;

public final static String path = "/hdfsroot";

private static Logger logger = LoggerFactory.getLogger(RMQToHadoop.class);

public RMQToHadoop(String hostName, String queueName) {
    super();
    this.hostName = hostName;
    this.queueName = queueName;
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

public void run() {
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    logger.info("Running " + RMQToHadoop.class.getName());
    DataStream&lt;MyAvroObject&gt; socketStockStream = env.addSource(new RMQSource&lt;&gt;(hostName, queueName, new MyDeserializationSchema()));
    Job job;
    try {
        job = Job.getInstance();
        AvroJob.setInputKeySchema(job, MyAvroObject.getClassSchema());
    } catch (IOException e1) {
        e1.printStackTrace();
    }

    try {
        JobConf jobConf = new JobConf(Job.getInstance().getConfiguration());
        jobConf.set("avro.output.schema", MyAvroObject.getClassSchema().toString());
        org.apache.avro.mapred.AvroOutputFormat&lt;MyAvroObject&gt; akof = new AvroOutputFormat&lt;MyAvroObject&gt;();
        HadoopOutputFormat&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt; hof = new HadoopOutputFormat&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt;(akof, jobConf);
        FileSinkFunctionByMillis&lt;Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt;&gt; fileSinkFunctionByMillis = new FileSinkFunctionByMillis&lt;Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt;&gt;(hof, 10000l);
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, new Path(path));

        socketStockStream.map(new MapFunction&lt;MyAvroObject, Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt;&gt;() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt; map(MyAvroObject envelope) throws Exception {
                logger.info("map");
                AvroKey&lt;MyAvroObject&gt; key = new AvroKey&lt;MyAvroObject&gt;(envelope);
                Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt; tupple = new Tuple2&lt;AvroWrapper&lt;MyAvroObject&gt;, NullWritable&gt;(key, NullWritable.get());
                return tupple;
            }
        }).addSink(fileSinkFunctionByMillis);
        try {
            env.execute();
        } catch (Exception e) {
            logger.error("Error while running " + RMQToHadoop.class + ".", e);
        }
    } catch (IOException e) {
        logger.error("Error while running " + RMQToHadoop.class + ".", e);
    }
}

public static void main(String[] args) throws IOException {
    RMQToHadoop toHadoop = new RMQToHadoop("localhost", "rabbitTestQueue");
    toHadoop.run();
}

}

如果您喜欢RabbitMQ以外的另一个源,那么使用另一个源就可以了。E、 g.使用 Kafka 消费者:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;

...

DataStreamSource<MyAvroObject> socketStockStream = env.addSource(new FlinkKafkaConsumer082<MyAvroObject>(topic, new MyDeserializationSchema(), sourceProperties));

问题:

    请复习。这是将数据保存到 HDFSS的好做法吗?

顺便说一下,当你想把数据保存回 Kafka 的时候,我可以用。。。

Properties destProperties = new Properties();
destProperties.setProperty("bootstrap.servers", bootstrapServers);
FlinkKafkaProducer<MyAvroObject> kafkaProducer = new FlinkKafkaProducer<L3Result>("MyKafkaTopic", new MySerializationSchema(), destProperties);

提前多谢!!!!

1 答案


0

我认为FileSinkFunctionByMillis可以使用,但这意味着你的 streaming程序是不容错的。这意味着如果你的源或机器或写入失败,那么你的程序将崩溃而无法恢复。

我建议你考虑一下使用滚动墨水(https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/streamingu guide.html#hadoop-文件系统)。这可以用来创建类似Flum的 pipeline ,将数据摄取到 HDFSS(或其他文件系统)中。滚动 sink是一个可恢复 sink,这意味着您的程序将是容错的,因为Kafka使用者也是容错的。您还可以指定一个自定义的Writer来以您想要的任何格式写入数据,例如Avro。


我来回答

写文章

提问题

面试题