java—使用基于计数的窗口 join两个流


0

我是FlinkStreamingAPI的新手,我想完成以下简单(IMO)任务。我有两个流,我想加入他们使用基于计数的窗口。我目前掌握的代码如下:

public class BaselineCategoryEquiJoin {

private static final String recordFile = "some_file.txt";

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
public Tuple2<String[], MyRecord> map(String s) throws Exception {
MyRecord myRecord = parse(s);
return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
ExecutionConfig config = environment.getConfig();
config.setParallelism(8);
DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
.join(dataStream)
.where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
}
}).print();
environment.execute();
}
}

我的代码可以正常 job ,但不会产生任何结果。实际上,从未调用apply方法的调用(通过在调试模式下添加断点进行验证)。我认为,前面的主要原因是我的数据没有时间属性。因此,窗口化(通过窗口具体化)没有正确完成。因此,我的问题是如何指出我希望我的加入是基于计数窗口的。例如,我希望 join在每个流中每100个元组具体化。前面的方法在 Flink可行吗?如果是,我应该在代码中做些什么来实现它。

在这一点上,我必须通知您,我试图调用countWindow()方法,但由于某些原因,它不是由Flink的JoinedStreams提供的。

谢谢你

1 答案


0

不支持基于计数的 join。您可以通过使用“事件时间”语义模拟基于计数的窗口,并将唯一的seq id作为时间戳应用于每个记录。因此,“5”的时间窗口实际上是5的计数窗口。


我来回答

写文章

提问题

面试题