提供意外输出的Flink自定义触发器


0

我想创建一个触发器,它第一次在20秒内被触发,之后每隔5秒触发一次。我使用了GlobalWindows和一个自定义触发器

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())

以下是TradeTrigger中的代码:

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

private static final long serialVersionUID = 1L;

static boolean flag=false;
static long ctime = System.currentTimeMillis();

private TradeTrigger() {
}

@Override
public TriggerResult onElement(
        Object arg0,
        long arg1,
        W arg2,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
        throws Exception {
    // TODO Auto-generated method stub

    if(flag == false){
        if((System.currentTimeMillis()-ctime) &gt;= 20000){
           flag = true;
           ctime = System.currentTimeMillis();
           return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    } else {
        if((System.currentTimeMillis()-ctime) &gt;= 5000){
            ctime = System.currentTimeMillis();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

}

@Override
public TriggerResult onEventTime(
        long arg0,
        W arg1,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
        throws Exception {
    // TODO Auto-generated method stub
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(
        long arg0,
        W arg1,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
        throws Exception {
    // TODO Auto-generated method stub
    return TriggerResult.CONTINUE;
}


public static &lt;W extends Window&gt; TradeTrigger&lt;W&gt; of() {
    return new TradeTrigger&lt;&gt;();
}

}

所以基本上,当flag为false时,即第一次,触发器应该在20秒后被触发,并将flag设置为true。从下一次开始,它应该每5秒发射一次。

我面临的问题是,每次触发触发器时,输出中只有一条消息。也就是说,我在20秒后收到一条消息,每5秒收到一条消息。

如果我使用.timeWindow(时间。秒(5) )并创建一个5秒的时间窗口,每5秒输出20条消息。

2 答案


0

触发器实现存在一些问题:

    永远不要将函数的状态存储在静态变量中。Flink没有在jvm中隔离用户进程。相反,它对每个TaskManager使用一个JVM并启动多个线程。因此,静态布尔标志在触发器的多个实例中共享。您应该存储flag Flink的ValueState接口,该接口可以从TriggerContext访问。Flink会注意检查您的状态,并在发生故障时恢复它。

0

在费边和 Flink邮件列表的帮助下 job 。


我来回答

写文章

提问题

面试题