我在eclipse中运行flink,Maven已经从其中获取了必要的jar。我的机器有一个八核的处理器,我要写的流应用程序从它的输入读取行,并计算一些统计数据。
当我在我的机器上运行这个程序时,我希望flink使用CPU的所有 core以及线程代码。然而,当我观看 cores时,我看到只有一个 cores被使用。我做了很多尝试,并在下面的代码中留下了我的最后一次尝试,即设置环境的并行性。我也试着把它单独设置为流等等。
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
我用netcat向程序输入数据:
nc -lk 9999 < fileName
问题是如何使程序在 local扩展并使用所有可用的 cores?