python-如何将CSV作为流 table source加载到PyFlink中?


0

我试图设置一个简单的游乐场环境来使用flinkpython表API。我最终要写的 job 将从 Kafka 或肯尼斯的长队中得到满足,但这让我很难去思考(和 test)。

我可以很高兴地从CSV加载并在批处理模式下处理它。但我不能让它在流模式下 job 。我如何在一个流式执行环境中做类似的事情(主要是为了让我可以在windows上玩)。

我知道我需要让系统使用EventTime(因为ProcTime会同时进入),但是我找不到任何方法来设置它。原则上,我应该能够将CSV的一列设置为事件时间,但是在文档中不清楚如何做到这一点(或者如果可能的话)。

为了让批执行 test运行,我使用了下面的代码,它从输入.csv并输出到输出.csv.

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(file).parent.resolve()
out_path = root / "output.csv"

try:
out_path.unlink()
except:
pass

from pyflink.table.window import Tumble

(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)

(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)

(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)

t_env.execute("tutorial_job")

以及输入.csv是

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

所以我的问题是如何设置它,使它从同一个CSV读取,但使用第一列作为事件时间,并允许我编写如下代码:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

任何帮助都将不胜感激,我无法从文件中找到答案。我使用的是python3.7和flink1.11.1。

2 答案


0

如果使用描述符API,则可以通过 schema指定一个字段为事件时间字段:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         )

但我还是建议你使用DDL,一方面它更容易使用,另一方面现有的描述符API存在一些缺陷,社区正在讨论重构描述符API


0

你试过使用 watermark策略吗?如前所述,您需要使用 watermark策略来使用事件时间。对于pyflink的例子,我个人认为用ddl格式声明它比较容易。


我来回答

写文章

提问题

面试题