使用fromElements函数创建数据流时出错
以下是费用-
原因:java.io.IOException异常:未能从中反序列化元素
使用fromElements函数创建数据流时出错
以下是费用-
原因:java.io.IOException异常:未能从中反序列化元素
为什么要处理InputStreamReader元组?我想这里有些误会。泛型类型指定要处理的数据的类型。例如
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
生成具有5个整数元组的有限数据流。我假设您实际上想要使用InputStreamReader来生成实际的元组。
如果您想通过HttpURLConnection进行读取,您可以实现您自己的SourceFunction(或RichSourceFunction),如下所示(替换为您想要使用的实际数据类型--同时考虑Flinks Tuple0到Tuple25类型):
env.addSource(new SourceFunction<OUT> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OUT> ctx) {
InputStreamReader isr = null;
try {
URL url = new URL("ex.in/res");
HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
if (httpconn.getResponseCode() != 200)
throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
isr = new InputStreamReader((httpconn.getInputStream()));
} catch (Exception e) {
// clean up; log error
return;
}
while(isRunning) {
OUT tuple = ... // get data from isr
ctx.collect(tuple);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
});
无法使用fromElements创建数据流