如果密钥空间大于可用存储空间,则在无限流上过滤重复项最终将失败。原因是您必须将已经看到的密钥存储在某个位置以过滤掉重复的密钥。因此,最好定义一个时间窗口,在这个时间窗口之后,您可以清除当前看到的一组关 key点。
如果您知道这个问题,但还是想尝试一下,那么可以在keyBy调用之后应用有状态的flatMap操作。有状态 map器使用Flink的状态抽象来存储它是否已经看到具有这个 key的元素。这样,您还将受益于Flink的容错机制,因为您的状态将被自动检查。
一个Flink程序做你的 job
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));
input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();
env.execute("Test");
}
其中DuplicateFilter的实现取决于Flink的版本。
版本>=1.0实施
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
private ValueState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
// we haven't seen the element yet
out.collect(value);
// set operator state to true so that we don't emit elements with this key again
operatorState.update(true);
}
}
}
版本0.10实现
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
private OperatorState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
// we haven't seen the element yet
out.collect(value);
operatorState.update(true);
}
}
}
更新:使用翻滚时间窗口
input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
out.collect(input.iterator().next());
}
})