java—在调用execute()之后,是否可以在FLINK CEP中添加新的模式?


0

我的代码如下:

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyObject> input = env.addSource(new MyCustomSource());

Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");

PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);

... 定义我的模式

DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());

resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));

try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}

这段代码可以正常 job 并执行我想要的操作,然后得到一个遵循我设置的模式的结果流。

我想知道的是,是否有可能将新的模式应用到我后来添加到环境中的这个源,从而在不调用的情况下获得匹配不同模式的不同结果流环境执行()另一次,因为除了新的结果流之外,我还得到了冗余的旧结果流(即旧模式执行多次)?

1 答案


0

目前,Flink的CEP库不支持开箱即用的动态模式更改。因此,一旦您定义了模式并开始 job ,它将只处理这个定义的模式。

但是,您可以编写自己的运算符来实现TwoInputStreamOperator接口,该接口在一个输入模式定义和另一个输入上接收流记录(类似于CoFlatMap函数)。对于每一个新的模式,您必须在操作符上编译一个新的NFA,并将任何新的传入流元素也提供给这个NFA。这样,你就可以达到你想要的行为。

将来,我们很可能会将这个特性添加到Flink的CEP库中。


我来回答

写文章

提问题

面试题