hadoop—寻找一种持续处理写入hdfs的文件的方法


0

我正在寻找一种可以:

    监视hdfs dir以查找新文件,并在它们出现时对其进行处理。

我看了apachespark:它可以读取新添加的文件,并可以处理从它离开的地方继续执行的重新启动。我找不到一种方法让它也处理同一个 job范围内的旧文件(所以只有1和3)。

我看了apacheflink:它处理新旧文件。但是,一旦 job重新启动,它将再次开始处理所有这些 job(1和2)。

这是一个非常常见的用例。我是不是在 spark/火石中遗漏了什么使它成为可能的东西?这里还有别的工具可以用吗?

3 答案


0

我建议您稍微修改一下文件摄取并 coalesce Kafka,这样每次在 HDFSS中放入新文件时,都会在Kafka队列中放入一条消息。然后使用Spark流从队列读取文件名,然后从hdfs和进程读取文件。

检查是一个真正的痛苦,也不能保证你想要什么。 Kafka 与 spark将能够保证恰好一次语义。

水槽有一个SpoolDirSource,你也可以看看。


0

使用Flink streaming,您可以完全按照您的建议处理目录中的文件,当您重新启动时,它将从停止的位置开始处理。它被称为连续文件处理。

您唯一需要做的是1)为您的 job启用 checkpoints,2)启动程序时使用:

    Time period = Time.minutes(10)
    env.readFile(inputFormat, "hdfs:// … /logs",
                 PROCESS_CONTINUOUSLY, 
                 period.toMilliseconds, 
                 FilePathFilter.createDefaultFilter())

这个特性是相当新的,在dev邮件列表中有一个关于如何进一步改进其功能的讨论。

希望这有帮助!


0

最好的方法是维护状态机。维护一个包含所有已处理文件的表或文件。

启动时的应用程序读取文件列表并在set/map中保持相同。任何已处理的新文件/旧文件都可以进行查找和验证。

此外,摄取文件夹还需要维护文件的某些状态。像已经处理过的文件被重命名,一些ext.failed文件被移到failed文件夹,被拒绝到一个被拒绝的文件夹。等

你可以使用spark/flink完成所有这些。。科技不是这里的瓶颈


我来回答

写文章

提问题

面试题