java-Flink流程序在处理时间内正确运行,但在事件时间内不会产生结果


0

更新

没有解决问题。

我想问题出在我代码的另一部分。所以首先要多了解一些背景知识。

该程序使用来自单个kafka队列的混合消息类型的JSON流。程序最初 transformation为ObjectNode类型的流。然后使用.split()将该流拆分为大约10个单独的流。这些流被 map到pojo流。

这些POJO流在被添加到一个窗口(每个POJO类型的流有一个窗口)之前被分配时间戳,在被发送回另一个kafka队列之前,这些POJO流在被添加到一个窗口(每个POJO类型的流有一个窗口)之前被分配时间戳,然后在一个自定义函数中被 key入,然后求和并平均。

扩展代码示例

public class flinkkafka {

public static void main(String[] args) throws Exception {
//create object mapper to allow object to JSON transform
final ObjectMapper mapper = new ObjectMapper();
final String OUTPUT_QUEUE = "test";
//setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment
.getExecutionEnvironment();

//set streaming environment variables from command line
ParameterTool parameterTool = ParameterTool.fromArgs(args);

//set time characteristic to EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//set watermark polling interval
env.getConfig().setAutoWatermarkInterval(1000L);

//Enable checkpoints to allow for graceful recovery
env.enableCheckpointing(1000);

//set parallelism
env.setParallelism(1);

//create an initial data stream of mixed messages
DataStream<ObjectNode> messageStream = env.addSource
        (new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), 
                new JSONDeserializationSchema(),
                parameterTool.getProperties())) 
                  .assignTimestampsAndWatermarks(new
                  BoundedOutOfOrdernessTimestampExtractor<ObjectNode>
                  (Time.seconds(10)){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public long extractTimestamp(ObjectNode value) {
                        DateFormat format = new SimpleDateFormat("yyyy-
                         MM-dd HH:mm:ss", Locale.ENGLISH);
                        long tmp = 0L;
                        try {
                            tmp = 
                           format.parse(value.get("EventReceivedTime")
                                .asText()).getTime();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Assigning timestamp " + 
                           tmp);
                        return tmp;
                    }

                });

//split stream by message type
SplitStream<ObjectNode> split = messageStream.split(new  
           OutputSelector<ObjectNode>(){
    private static final long serialVersionUID = 1L;

    @Override
    public Iterable<String> select(ObjectNode value){
        List<String> output = new ArrayList<String>();
        switch (value.get("name").asText()){
        case "one":
            switch (value.get("info").asText()){
            case "two":
                output.add("info");
                System.out.println("Sending message to two
                      stream");
                break;
            case "three":
                output.add("three");
                System.out.println("Sending message to three stream");
                break;
            case "four":
                output.add("four");
                System.out.println("Sending message to four stream");
                break;
            case "five":
                output.add("five");
                System.out.println("Sending message to five stream");
                break;
            case "six":
                output.add("six");
                System.out.println("Sending message to six stream");
                break;
            default:
                break;
            }
            break;
        case "seven":
            output.add("seven");
            System.out.println("Sending message to seven stream");
            break;
        case "eight":
            output.add("eight");
            System.out.println("Sending message to eight stream");
            break;
        case "nine":
            output.add("nine");
            System.out.println("Sending message to nine stream");
            break;
        case "ten":
            switch (value.get("info").asText()){
            case "eleven":
                output.add("eleven");
                System.out.println("Sending message to eleven stream");
                break;
            case "twelve":
                output.add("twelve");
                System.out.println("Sending message to twelve stream");
                break;
            default:
                break;
            }
            break;
        default:
            output.add("failed");
            break;
        }
        return output;
    }
});

//assign splits to new data streams
DataStream<ObjectNode> two = split.select("two");
//assigning more splits to streams

//convert ObjectNodes to POJO 

DataStream<Two> twoStream = two.map(new MapFunction<ObjectNode, Two>(){
    private static final long serialVersionUID = 1L;

    @Override
    public Twomap(ObjectNode value) throws Exception {
        Two stream = new Two();
        stream.Time = value.get("Time").asText();
        stream.value = value.get("value").asLong();
        return front;
    }
});

DataStream<String> keyedTwo = twoStream
        .keyBy("name")
        .timeWindow(Time.minutes(5))
        .apply(new twoSum())
        .map(new MapFunction<Two, String>(){
            private static final long serialVersionUID = 1L;
            @Override
            public String map(Two value) throws Exception {
                return mapper.writeValueAsString(value);
            }
        });
keyedTwo.addSink(new FlinkKafkaProducer09<String>
     (parameterTool.getRequired("bootstrap.servers"),
             OUTPUT_QUEUE, new SimpleStringSchema()));

env.execute();

我试图使用Flink来聚合一个Kafka队列并将数据流推回到Kafka。聚合将使用一个5分钟的事件时间窗口,程序将编译并运行,但收集的数据不会离开窗口传递给聚合函数,因此从不向Kafka传递消息。但是,如果我注释掉eventTime特性,程序运行并产生结果。我不知道我错 where。

事件时间代码

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.enableCheckpointing(1000);

DataStream<FrontEnd> frontEndStream = frontEnd.map(new
MapFunction<ObjectNode, FrontEnd>(){

    private static final long serialVersionUID = 1L;

    @Override
    public FrontEnd map(ObjectNode value) throws Exception {
    FrontEnd front = new FrontEnd();
    front.eventTime = value.get("EventReceivedTime").asText();
    return front;
    }
}).assignTimestampsAndWatermarks(new
    BoundedOutOfOrdernessTimestampExtractor&lt;FrontEnd&gt;(Time.seconds(10)){
        private static final long serialVersionUID = 1L;
        @Override
        public long extractTimestamp(FrontEnd value) {
            DateFormat format = new SimpleDateFormat("yyyy-MM-
                ddHH:mm:ss",Locale.ENGLISH);
            long tmp = 0L;
            try {
            tmp = format.parse(value.eventTime).getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return tmp;
    }

});

DataStream&lt;String&gt; keyedFrontEnd = frontEndStream
    .keyBy("name")
    .timeWindow(Time.minutes(5))
    .apply(new FrontEndSum())
    .map(new MapFunction&lt;FrontEnd, String&gt;(){
            private static final long serialVersionUID = 1L;
            @Override
            public String map(FrontEnd value) throws Exception {
                return mapper.writeValueAsString(value);
            }
        });

.map(new MapFunction<FrontEnd, String>(){
private static final long serialVersionUID = 1L;
@Override
public String map(FrontEnd value) throws Exception {
return mapper.writeValueAsString(value);
}
});
keyedFrontEnd.addSink(new FlinkKafkaProducer09<String>
(parameterTool.getRequired("bootstrap.servers"), OUTPUT_QUEUE, new
SimpleStringSchema()));

env.execute();
}

}

我尝试过将时间戳提取器附加到传入流,并将其附加到每个POJO流。同样,这段代码与事件时间一起运行,并生成带有预期聚合的JSON字符串流的预期结果。然而,一旦事件时间被启用,窗口就不会产生结果

2 答案


0

boundedoutofernessTimestampExtractor实现了AssignerWithPeriodicWatermarks接口,这意味着Flink会定期 query当前 watermark。

您必须通过ExecutionConfig配置轮询间隔:

env.getConfig.setAutoWatermarkInterval(1000L); // poll watermark every second

0

我的第一个倾向总是假设时区问题。

Kafka 有效载荷中“EventReceivedTime”字段的时区是多少?

SimpleDateFormat将在 localJVM时区中解析:

DateFormat format = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss",Locale.ENGLISH);

你可以添加

format.setTimeZone(TimeZone.getTimeZone("GMT"));

例如,如果文本所代表的是GMT,则将字符串解析为GMT。您应该确保所有日期、 watermark等的时区/偏移量都匹配,并以UTC/历元时间进行比较(这是从中提取长时间后得到的结果)。


我来回答

写文章

提问题

面试题