scala-有人在Flink中有一个泛型ProcessFunction的例子吗?


0

我所说的“泛型”是指能够接受任何类型的对象作为输入,并将同一对象作为输出返回。

假设函数的任务是将每个元素序列化为json,并将其作为副输出写入。

class MyProcessFunction() extends ProcessFunction[? , ?] {

def processElement(element: ?, ctx: ProcessFunction[?, ?]#Context, out: Collector[?]): Unit = ??? 

... 

}

我能以这样一种方式定义它,使它可以被不同类型的输入使用吗?

1 答案


0

可以通过使类泛型来实现这一点。所以,你会有类似的东西:

class MyProcessFunction[T] extends ProcessFunction[T, T] {
  override def processElement(value: T, ctx: ProcessFunction[T, T]#Context, out: Collector[T]): Unit = ???
}

这样,在创建函数实例时,就可以确定类型。


我来回答

写文章

提问题

面试题