如何在Apache Flink的StreamTableEnvironment中实现timeWindow()?


0

所有人,

我以前用过时间窗口(时间.秒())函数的数据流来自kafka主题。

我想用SQL进行x次窗口聚合,然后将其发送到另一个kafka主题

数据来源:

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

以前的聚合示例:

    val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

当前数据表:

    val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

在这一部分中,应该有一个 query,它每X次进行一次聚合

    val result = tableEnv.sqlQuery(
          s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

或多或少,我的聚合将计数(y)除以(x分区)

1 答案


0

Ververica对flinksql的培训将帮助您实现这一点。在关于使用SQL query动态表的一节中,在中包含了一些练习/示例,这些练习/示例仅涉及此类 query。

您必须为每个事件建立计时信息的来源,可以是处理时间,也可以是事件时间,之后 query对应于stream.keyBy(“x”)。时间窗口(时间.秒(5) 你说,时间.秒(5) )会是这样的:

SELECT
  x,
  TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
  COUNT(*) AS cnt
FROM Events
GROUP BY
  x, TUMBLE(timestamp, INTERVAL '5' SECOND);

有关如何使用时间属性的详细信息,请参阅时间属性简介。

有关使用flinksql窗口的更多详细文档,请参阅组窗口上的文档。


我来回答

写文章

提问题

面试题