在多核处理器上 local运行apache flink


0

我在eclipse中运行flink,Maven已经从其中获取了必要的jar。我的机器有一个八核的处理器,我要写的流应用程序从它的输入读取行,并计算一些统计数据。

当我在我的机器上运行这个程序时,我希望flink使用CPU的所有 core以及线程代码。然而,当我观看 cores时,我看到只有一个 cores被使用。我做了很多尝试,并在下面的代码中留下了我的最后一次尝试,即设置环境的并行性。我也试着把它单独设置为流等等。

public class SemSeMi {


public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
            .getLocalFileSystem().getWorkingDirectory());

    StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();

    env.setParallelism(8);

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX());

    env.execute("Something");       
}

public static class SplitterX implements
        FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence,
            Collector<Tuple2<String, Integer>> out) throws Exception {
        // Do Nothing!

    }
}

}

我用netcat向程序输入数据:

 nc -lk 9999 < fileName

问题是如何使程序在 local扩展并使用所有可用的 cores?

1 答案


0

您不必显式地指定并行度。以默认设置运行的 job将自动将并行度设置为可用 core数。

在您的情况下,源代码将以1的并行度运行,因为从套接字读取的数据无法分发。但是,对于flatMap操作,系统将实例化8个实例。如果您打开日志记录,那么您也将看到它。现在,输入数据以循环方式分发到flatMap任务。每个flatMap任务都由一个单独的线程执行。

我怀疑,为什么您只看到单个 core上的负载,是因为SplitterX不做任何 job 。尝试以下代码,计算每个字符串中的字符数,然后将结果打印到控制台:

public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
    .getLocalFileSystem().getWorkingDirectory());

StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment();

env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print();

env.execute("Something");

}

public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(Tuple2.of(sentence, sentence.length()));

}

}

每行开头的数字告诉您哪个任务打印了结果。


我来回答

写文章

提问题

面试题