Flink Avro镶木地板作家RollingSink


0

当我试图在RollingSink中设置AvroParquetWriter时,我遇到了一个问题,

    flink版本:1.1.3

错误:

[...]
12/14/2016 11:19:34 Source: Custom Source -> Sink: Unnamed(8/8) switched to CANCELED
INFO  JobManager - Status of job af0880ede809e0d699eb69eb385ca204 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
    at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /home/user/data/file
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:264)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:257)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:386)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:266)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:183)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:153)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:119)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:92)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:66)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
    at fr.test.SpecificParquetWriter.open(SpecificParquetWriter.java:28) // line in code => writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
    at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:451)
    at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:371)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
    ... 7 more
INFO  JobClientActor - 12/14/2016 11:19:34  Job execution switched to status FAILED.
12/14/2016 11:19:34 Job execution switched to status FAILED.
INFO  JobClientActor - Terminate JobClientActor.
[...]

主要:

RollingSink sink = new RollingSink<String>("/home/user/data");
sink.setBucketer(new DateTimeBucketer("yyyy/MM/dd"));
sink.setWriter(new SpecificParquetWriter());
stream.addSink(sink);

特定拼花作者:

public class SpecificParquetWriter<V> extends StreamWriterBase<V> {

private transient AvroParquetWriter writer;

private CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private int blockSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
private int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;

public static final String USER_SCHEMA = "{"
        + ""type":"record","
        + ""name":"myrecord","
        + ""fields":["
        + "  { "name":"str1", "type":"string" },"
        + "  { "name":"str2", "type":"string" },"
        + "  { "name":"int1", "type":"int" }"
        + "]}";

public SpecificParquetWriter(){

}

@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    Schema schema = new Schema.Parser().parse(USER_SCHEMA);

    writer = new AvroParquetWriter(new Path("/home/user/data/file"), schema, compressionCodecName, blockSize, pageSize);
}

@Override
public void write(Object element) throws IOException {
    if(writer != null)
        writer.write(element);
}

@Override
public Writer duplicate() {
    return new SpecificParquetWriter();
}

}

我不知道我做这件事的方式是否正确。。。

有简单的方法吗?

1 答案


0

对于RollingSink,基类是Writer,对于Bucketing Sink,这是一个问题,因为它们只接受可以处理OutputStream的Writer,而不是保存它们自己的Writer。

writer=新的AvroKeyValueWriter(keySchema,valueSchema,compressionCodec,streamObject);

而AvroParquetWriter或ParquetWriter接受文件路径

writer=AvroParquetWriter.生成器(新路径(“filePath”))

我深入了解了拼花匠,并意识到我们要做的事情是没有意义的,因为Flink是一个事件处理系统,像storm,不能向拼花板写入一条记录,而spark streaming可以,因为它是基于微批处理原理 job 的。

在Trident中使用Storm,我们仍然可以编写拼花板文件,但是对于FLink,我们不能这样做,直到FLink引入了像MicroBatches这样的东西。

因此,对于这种类型的用例,Spark流是一个更好的选择。

如果要使用Flink,可以进行批处理。


我来回答

写文章

提问题

面试题