Apache Flink 1.0.0版。与事件时间相关的迁移问题


0

我尝试将一些简单的任务迁移到Flink 1.0.0版本,但失败了,出现以下异常:

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

该代码由两个通过Kafka主题 join的独立任务组成,其中一个任务是简单消息生成器,另一个任务是简单消息消费者,后者使用timeWindowAll计算每分钟的消息到达率。

同样,类似的代码在0.10.2版本中运行时没有任何问题,但现在看起来系统错误地 explain了一些事件时间戳,如长最小值导致任务失败。

问题是我是做错了什么,还是它是一些将在未来版本中修复的bug?

主要任务:

public class Test1_0_0 {
    // Max Time lag between events time to current System time
    static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);

public static void main(String[] args) throws Exception {
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();
    // Setting Event Time usage
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setBufferTimeout(1);
    // Properties initialization
    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("zookeeper.connect", "localhost:2181");
    properties.setProperty("group.id", "test");

    // Overwrites the default properties by one provided by command line
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
        properties.setProperty(e.getKey(),e.getValue());
    }
    //properties.setProperty("auto.offset.reset", "smallest");
    System.out.println("Properties: " + properties);
    DataStream<Message> stream = env
    .addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
    // The call to rebalance() causes data to be re-partitioned so that all machines receive messages
    // (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
    stream.rebalance()
    .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
    // Counts messages
    stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
        @Override
        public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
            Integer count = 0;
            if (values.iterator().hasNext()) {
                for (Message value : values) {
                    count++;
                }
                collector.collect("Arrived last minute: " + count);
            }
        }
    }).print();
    // execute program
    env.execute("Messages Counting");
}

}

时间戳提取器:

public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable {

private static final long serialVersionUID = 7526472295622776147L;
// Maximum lag between the current processing time and the timestamp of an event
long maxTimeLag = 0L;
long currentWatermarkTimestamp = 0L;

public MessageTimestampExtractor() {
}

public MessageTimestampExtractor(Time maxTimeLag) {
    this.maxTimeLag = maxTimeLag.toMilliseconds();
}


/**
 * Assigns a timestamp to an element, in milliseconds since the Epoch.
 *
 * &lt;p&gt;The method is passed the previously assigned timestamp of the element.
 * That previous timestamp may have been assigned from a previous assigner,
 * by ingestion time. If the element did not carry a timestamp before, this value is
 * {@code Long.MIN_VALUE}.
 *
 * @param message The element that the timestamp is wil be assigned to.
 * @param previousElementTimestamp The previous internal timestamp of the element,
 *                                 or a negative value, if no timestamp has been assigned, yet.
 * @return The new timestamp.
 */
@Override
public long extractTimestamp(Message message, long previousElementTimestamp) {
    long timestamp = message.getTimestamp();
    currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp);
    return timestamp;
}


/**
 * Returns the current watermark. This method is periodically called by the
 * system to retrieve the current watermark. The method may return null to
 * indicate that no new Watermark is available.
 *
 * &lt;p&gt;The returned watermark will be emitted only if it is non-null and larger
 * than the previously emitted watermark. If the current watermark is still
 * identical to the previous one, no progress in event time has happened since
 * the previous call to this method.
 *
 * &lt;p&gt;If this method returns a value that is smaller than the previously returned watermark,
 * then the implementation does not properly handle the event stream timestamps.
 * In that case, the returned watermark will not be emitted (to preserve the contract of
 * ascending watermarks), and the violation will be logged and registered in the metrics.
 *
 * &lt;p&gt;The interval in which this method is called and Watermarks are generated
 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 * @see ExecutionConfig#getAutoWatermarkInterval()
 */
@Override
public Watermark getCurrentWatermark() {
    if(currentWatermarkTimestamp &lt;= 0) {
        return new Watermark(Long.MIN_VALUE);
    }
    return new Watermark(currentWatermarkTimestamp - maxTimeLag);
}

public long getMaxTimeLag() {
    return maxTimeLag;
}

public void setMaxTimeLag(long maxTimeLag) {
    this.maxTimeLag = maxTimeLag;
}

}

1 答案


0

问题是调用assignTimestampsAndWatermarks会返回一个使用时间戳提取器的新数据流。因此,必须使用返回的数据流对其执行后续操作。

DataStream<Message> timestampStream = stream.rebalance()
        .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
        Integer count = 0;
        if (values.iterator().hasNext()) {
            for (Message value : values) {
                count++;
            }
            collector.collect("Arrived last minute: " + count);
        }
    }
}).print();

我来回答

写文章

提问题

面试题