如何存储状态并在另一个流中使用?


0

我有一个Flink的用例,我需要从一个文件中读取信息,存储每一行,然后使用这个状态来过滤另一个流。

我现在使用connect操作符和richcoplatmapfunction完成了所有这些 job ,但感觉过于复杂。另外,我担心flatMap2可能会在从文件加载所有状态之前开始执行:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

    @Override
    public void flatMap2(PartRecord record, Collector&lt;PartRecord&gt; out) throws Exception {
        if (record.getPartId().equals(storedPartId.value())) {
            out.collect(record);
        } else {
            // do nothing
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor&lt;String&gt; descriptor =
                new ValueStateDescriptor&lt;&gt;(
                        "partId", // the state name
                        TypeInformation.of(new TypeHint&lt;String&gt;() {}),
                        null);
        storedPartId = getRuntimeContext().getState(descriptor);
    }
});

有没有更好的方法(如Flink1.1.3)来完成这种加载状态模式,然后在后续的流中使用它?

1 答案


0

您对CoFlatMapFunction的关注是正确的。flatMap1和flatMap2的调用顺序无法控制,取决于数据到达的顺序。因此,flatMap2可能在flatMap1读取所有数据之前被调用。

在Flink 1.1.3中,在开始处理流之前读取所有数据的唯一方法是使用RichFlatMapFunction的open()方法中的数据,即必须手动读取和解析文件。

这基本上是一种广播 join策略,也就是说,运营商的每个并行实例都会这样做。缺点是文件的数据将被复制。这样做的好处是您不必对“主”流进行无序处理(不需要使用keyBy())。


我来回答

写文章

提问题

面试题