在Apache Flink中创建数据流时出错


0

使用fromElements函数创建数据流时出错

以下是费用-

原因:java.io.IOException异常:未能从中反序列化元素

2 答案


0

为什么要处理InputStreamReader元组?我想这里有些误会。泛型类型指定要处理的数据的类型。例如

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

生成具有5个整数元组的有限数据流。我假设您实际上想要使用InputStreamReader来生成实际的元组。

如果您想通过HttpURLConnection进行读取,您可以实现您自己的SourceFunction(或RichSourceFunction),如下所示(替换为您想要使用的实际数据类型--同时考虑Flinks Tuple0到Tuple25类型):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

@Override
public void run(SourceContext&lt;OUT&gt; ctx) {
    InputStreamReader isr = null;
    try {
        URL url = new URL("ex.in/res");
        HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
        if (httpconn.getResponseCode() != 200)
            throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
        isr = new InputStreamReader((httpconn.getInputStream()));
    } catch (Exception e) {
        // clean up; log error
        return;
    }

    while(isRunning) {
        OUT tuple = ... // get data from isr
        ctx.collect(tuple);
    }
}

@Override
public void cancel() {
     this.isRunning = false;
}

});


0

无法使用fromElements创建数据流,因为InputStreamReader不可序列化。这是fromElements方法所必需的。此外,在InputStreamReaders上 job 可能没有多大意义。我想最好只是从HttpURLConnection读取数据,然后继续处理这些数据。


我来回答

写文章

提问题

面试题