如何在apacheflink中实现调节系统


0

我正在构建一个协调系统,通过使用来自两个不同kafka流的数据,并通过 join到一个公共共享Id来协调数据内容。

最初,我使用了间隔 join,但是遇到了一个问题:它只支持内部 join。因此,如果我没有从一个流中获取公共共享Id的数据,那么这个特定的数据就会丢失。

有没有可能的方法来获取两个流的所有数据,然后对匹配的记录进行间隔 join,并将不匹配的数据刷新到一些elasticsearch/hive数据库中?

1 答案


0

使用这个功能的Interval join并不是不可能实现的,但我认为很难开发。

如果您对实现间隔 join没有严格的要求。您可以使用带有滚动窗口或滑动窗口等的Co-Group来实现外部 join。你可以用一个过滤器来区分成功和失败的匹配。然后,您可以对成功的执行 join逻辑,并将失败的逻辑推送到某个 sink。

DataStream<Tuple2<List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>> stream = source1.coGroup(source2)
            .where((x) -> x.f0)
            .equalTo((x) -> x.f0)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
            .apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
                Tuple2<List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>>() {
                @Override
                public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>> out) throws Exception {
                    List<Tuple2<String, Integer>> lst = new ArrayList<>();
                    List<Tuple2<String, Integer>> lst2 = new ArrayList<>();
                    first.forEach(lst::add);
                    second.forEach(lst2::add);
                    out.collect(new Tuple2<>(lst, lst2));
                }
            });
        // If any of the list is empty. Key didn't match
        DataStream<Tuple2<List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>> misMatch = 
            stream.filter(x -> x.f0.isEmpty() || x.f1.isEmpty());
        misMatch.addSink(new SomeSink());


    DataStream&lt;Tuple2&lt;List&lt;Tuple2&lt;String, Integer&gt;&gt;, List&lt;Tuple2&lt;String, Integer&gt;&gt;&gt;&gt; match = 
        stream.filter(x -&gt; !x.f0.isEmpty() &amp;&amp; !x.f1.isEmpty());
    DataStream&lt;Tuple3&lt;String, String, Integer&gt;&gt; joined = match.flatMap((x, y) -&gt; {
        // your join logic.
        y.collect(new Tuple3&lt;&gt;("your", "joined tuple", 1);
    });

    joined.addSink(new SomeOtherSink());


我来回答

写文章

提问题

面试题