java——如何 root据状态变化事件,以分布式方式计算flink状态下的“客户机”数量?我需要有状态的对象


0

我正在用java开发poc项目,使用kafka->flink->elastic search。

在kafka上,将产生不可预测的事件数,从0到数千个事件/秒,例如特定主题。

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

Flink将消耗此事件,并应在 Elasticsearch中每秒搜索每个状态下的事件数,例如:

{"stateA":54, "stateB":100, ... "stateJ":34}

我有10个 state:【创造。。。,删除]平均生命周期为15分钟。状态每秒可以改变两次。理论上可以增加新的状态。

为了让水流每秒都下沉,我想用 Flink的时间窗https://flink.apache.org/news/2015/12/04/introduction-windows.html

问题是我需要有状态的对象,其中包含guid->previous state和stateX->count的信息,以便能够在新事件发生时增加/减少计数。

我找到了一份关于有状态蒸汽处理的文件草稿https://cwiki.apache.org/conference/display/FLINK/Stateful+Stream+Processing

我是flink和流处理的新手,我还没有深入研究flink有状态的流处理。在第一阶段,我考虑使用静态对象来实现这一点,但是当几个flink实例要启动时,这种方法就行不通了。

我想问你:

    你觉得这种方法怎么样?

另外,我也很欣赏窗口状态流解决方案(或其他解决方案)的一些代码片段。

谢谢,

1 答案


0

下面这些怎么样?

它使用15分钟的窗口,之后窗口状态将被清除。它还使用一个自定义触发器,该触发器每秒对窗口求值。就窗口操作而言,有一个ReduceFunction只保留每个guid的最新状态,还有一个WindowFunction发出一个(state,1)元组。然后按这个状态 key,求和。我想这会给你你想要的结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))

val results = stream
.keyBy(_.guid)
.timeWindow(Time.minutes(15))
.trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
.apply(
(e1, e2) => e2,
(k, w, i, c: Collector[(String, Long)]) => {
if (i.head != null) c.collect((i.head.state, 1))
}
)
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)
.addSink(new ElasticsearchSink<>(...))

env.execute("Count States")

周期性 spark的处理流量计定义如下:

object ProcessingTimeTriggerWithPeriodicFirings {
  def apply(intervalMs: Long) = {
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
  }
}

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
extends Trigger[Event, TimeWindow] {

private val startTimeDesc =
new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)

override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val startTime = ctx.getPartitionedState(startTimeDesc)
if (startTime.value == 0) {
startTime.update(window.getStart)
ctx.registerProcessingTimeTimer(window.getEnd)
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
}
TriggerResult.CONTINUE
}

override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
if (time == window.getEnd) {
TriggerResult.PURGE
}
else {
ctx.registerProcessingTimeTimer(time + intervalMs)
TriggerResult.FIRE
}
}

override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}


我来回答

写文章

提问题

面试题