Flink如何清理按 key状态?


0

当我想到按某个东西 key入的行为时,我通常会想到一个类比,即将与该 key匹配的所有事件都扔到同一个桶中。正如您可以想象的那样,当Flink应用程序开始处理大量数据时,您选择的key-by开始变得非常重要,因为您希望确保能够很好地清理状态。这就引出了我的问题, Flink到底是如何清理这些“水桶”的?如果bucket是空的(所有MapStates和ValueStates都是空的),Flink是否会关闭 key空间的那个区域并删除bucket?

例子:

传入数据格式:{userId,computerId,amountOfTimeLoggedOn}

密钥:UserId/ComputerId

当前密钥空间:

    爱丽丝,电脑10:里面有两个事件。两个事件都以状态存储。

Flink会来把Bob,Computer11从密钥空间中移除,还是因为它曾经有过一个事件而永远活下去?

2 答案


0

Flink不会为没有任何用户值关联的状态 key存储任何数据,至少在现有的状态后端:Heap(内存中)或RocksDB中。

在Flink中,密钥空间是虚拟的,Flink不假设哪些具体的密钥可能存在。每个 key或 key的子集没有任何预先分配的存储桶。只有当用户应用程序为某个密钥写入一些值时,它才会占用存储空间。

一般的想法是,具有相同密钥的所有记录都在同一台机器上处理(有点像您所说的在同一个桶中)。某个密钥的 local状态也始终保持在同一台计算机上(如果存储的话)。但这与 checkpoints无关。

例如,如果在某个时间点为[Bob,Computer 11]写入了某个值,然后又被删除,Flink将使用该 key将其完全删除。


0

简短回答

它借助flinkstate的生存时间(TTL)特性和Java垃圾收集器(GC)进行清理。GC将取回已分配给该特性的任何引用。

冗长的回答

您的问题可分为3个子问题:

我会尽量简短。

Flink是如何 root据密钥对数据进行分区的?

对于 key控流上的运算符,Flink借助一致的哈希算法对 key上的数据进行分区。它创建最大的bucket并行数。为每个操作员实例分配一个或多个这些bucket。每当一个数据要被发送到下游时, key就被分配给其中一个桶,并因此被发送到相关的操作员实例。这里没有存储 key,因为范围是用数学方法计算的。因此,任何时候都不会清除区域或删除bucket。您可以创建任何类型的密钥。它不会在 key空间或范围方面影响内存。

Flink如何使用密钥存储状态?

所有操作员实例都有一个实例级状态存储。该存储定义了该操作员实例的状态上下文,它可以存储多个命名状态存储,例如“count”、“sum”、“some name”等。这些命名状态存储是 key值存储,可以 root据数据的 key存储值。

当我们用操作符open()函数中的状态描述符初始化状态时,就会创建这些KV存储。i、 e.getRuntimeContext().getValueState()。

这些千伏存储器只在需要存储数据的状态下存储数据。(就像HashMap.put(k,v))。因此,除非调用状态更新方法(如update、add、put),否则不会存储任何 key或值。

所以,

    如果Flink没有看到密钥,则不会为该密钥存储任何内容。

Flink如何清理国家的钥匙?

Flink不会删除状态,除非用户需要或由用户手动完成。如前所述,Flink具有 state的TTL特性。此TTL将标记状态到期,并在调用清理策略时将其删除。这些清理策略因后端类型和清理时间而异。对于堆状态后端,它将从状态表中删除条目,即删除对该条目的任何引用。这个未被引用的条目所占用的内存将由javagc清理。对于RocksDB State后端,它只调用RocksDB的本机删除方法。


我来回答

写文章

提问题

面试题