基于第三个事件 chaining接Flink中的两个事件


0

如果两个事件流到Flink中,是否可以使用后面第三个事件中的信息(使用datastreamapi或CEP)将它们逻辑地 join起来?E、 g.下面例子中的第三个事件是否可以 root据其右_id和left_id chaining接前两个事件?

ID: AAAA
ID: BBBB
ID: ZZZZ, right_id: AAAA, left_id: BBBB 

1 答案


0

这是一个非常基本的CEP用例。代码看起来像这样。。。

// data stream creation 
DataStream<Event> myStream = ... 

// cep pattern definition
Pattern<Event, ?> myPattern = Pattern.<Event>begin("first_event")
.followedBy("second_event")
.followedBy("match_event");

// cep pattern stream: apply pattern to stream
PatternStream<Event> myPatternStream = CEP.pattern(myStream, myPattern);

// create new data stream from pattern matches
DataStream<CEPEvent> myCEPEvent = myPatternStream.flatSelect(
(Map<String, Event> pattern, Collector<CEPEvent> out) -> {

            // load potential event sequence matches
            Event first_event = pattern.get("first_event");
            Event second_event = pattern.get("second_event");
            Event match_event = pattern.get("match_event");

            // test event sequences 
            if (match_event.right_id.equals(first_event.ID)
                &amp;&amp; match_event.left_id.equals(second_event.ID)
            ){out.collect(new CEPEvent("successful cep hit"));}
        }
    );


我来回答

写文章

提问题

面试题