从List<String>filePaths转发文件


0

我们有一个DB表中的文件路径列表,在创建时它的时间戳是打开的。试图弄清楚如何使用db中的filepath列表仅将那些文件从nfs转发到kafka sink。

现在我正在使用自定义版本的ContinuousFileMonitoringFunction,它的 root文件夹将包含DB将显示的所有文件。这个操作非常慢,因为文件夹太大,没有多少TB的数据,所以要在文件夹中收集更新文件的信息。

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds = result.toDataSet();

ds拥有所有应该发送到kafka的文件路径。

以下是我计划实施的想法。但是考虑到flink并行性、flink库支持等,有没有更好的有效方法呢?

public class FileContentMap extends RichFlatMapFunction<String, String> {



@Override
public void flatMap(String input, Collector&lt;String&gt; out) throws Exception {



    // get the file path
    String filePath = input;

    String fileContent = readFile(input);

out.collect(fileCOntent);


}

@Override
public void open(Configuration config) {

}

}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);

1 答案


0

我觉得你的方法不错。也许更有效的方法是创建一个richpallelsourcefunction,在open()方法中,调用DB以获取已更新的文件列表,并在内存中构建特定源子任务的文件列表(类似于文件路径.hashCode()%numSubTasks==mySubTask)应发出要处理的通过你的文件内容 map。


我来回答

写文章

提问题

面试题