为什么flink停止我的流应用程序?


0

我的代码使用readTextFile读取日志文件,当我在Flink(/opt/Flink-1.0.3/bin/Flink run-m yarn cluster-yn2/home/Flink/Flink-json-0.1.jar)中运行jar时,它成功地处理了里面的行并停止了我的应用程序,而不是等待新行。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")

提前谢谢你

1 答案


0

你在找

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

对于WatchType,您有以下选项

    只有新的文件,

来自

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)

将在读取所有文件后完成。我认为,它主要是为了在开发过程中进行 local test。


我来回答

写文章

提问题

面试题