java-Apache Flink streaming窗口WordCount


0

我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是需要的。程序有一个问题,累积计数总是与窗口计数相同。为什么会出现这个问题? root据加窗计数计算累积计数的正确方法是什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});

1 答案


0

您应该首先分组,并在 key控数据流上应用窗口(您的代码在Flink 0.9.1上 job ,但Flink 0.10.0中的新API对此非常严格):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果在非 key控数据流上应用一个窗口,那么在一台计算机上只有一个单线程窗口操作符(即,没有并行性)来在整个流上构建窗口(在Flink 0.9.1中,这个全局窗口可以通过groupBy()拆分为子窗口——但是在Flink 0.10.0中,这将不再起作用)。为了计算字数,您需要为每个不同的 key值构建一个窗口,即,您首先为每个 key值获取一个子流(通过groupBy()),并在每个子流上应用一个窗口操作符(因此,您可以为每个子流拥有一个自己的窗口操作符实例,允许并行执行)。

对于全局(累计)计数,可以简单地应用groupBy().sum()构造。首先,流被分成子流(每个 key值一个子流)。其次,计算流上的和。因为流没有窗口化,所以计算(累积)和更新每个传入元组的和(更详细地说,和的初始结果值为零,结果更新为每个元组的结果+=元组值). 每次调用sum之后,都会发出新的当前结果。

在代码中,不应使用特殊的sink函数,而应执行以下操作:

counts.groupBy(0).sum(1).print();

我来回答

写文章

提问题

面试题