Flink培训教程中的一个问题:LongRidesSolution.scala公司


0

此函数(ProcessElement)的作用非常明确:

override def processElement(ride: TaxiRide,
                                context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
                                out: Collector[TaxiRide]): Unit = {
      val timerService = context.timerService
      if (ride.isStart) {
        // the matching END might have arrived first; don't overwrite it
        if (rideState.value() == null) {
          rideState.update(ride)
        }
      }
      else {
        rideState.update(ride)
      }

  timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
}

一旦 watermark到达时间戳,定时器就会触发

    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
                         out: Collector[TaxiRide]): Unit = {
      val savedRide = rideState.value

  if (savedRide != null && savedRide.isStart) {
    out.collect(savedRide)
  }
  rideState.clear()
}

问题是:如果结束记录先到,然后 root据逻辑,它不会更新骑乘状态(相关 key),然后在2小时后触发,那么它不会收集也不会发出记录,但是如果这个记录符合我们的要求呢?==>记录的开始时间发生在2小时之前?我认为应该有更多的逻辑来处理这个问题

1 答案


0

如果结束记录在开始记录之前被处理,那么开始记录可能很晚才到达,当它确实到达时,它提供证据证明这一旅程持续了两个多小时。

然而,这个练习的目的不是找出所有持续超过两个小时的游乐设施,而是实时标记那些应该在现在结束的骑乘活动(因为它们是在两个多小时前开始的),但还没有结束。既然你所问的这些游乐设施已经结束,那么它们是否值得提醒还值得商榷。

你提出了一个有趣的观点,可能应该添加到练习讨论页面。


我来回答

写文章

提问题

面试题