scala-重新运行后Flink状态为空(重新初始化)


0

我尝试 join两个流,第一个是在MapValueState中 persist化:

 env.setStateBackend(new RocksDBStateBackend(..)
 env.enableCheckpointing(1000)
 ...

val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
.keyBy(_.id)

val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
.keyBy(_.id)

  productDescriptionStream
  .connect(productStockStream)
  .process(ProductProcessor())
  .setParallelism(1)

env.execute("Product aggregator")

产品加工商

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
  "productDescription",
  createTypeInformation[String],
  createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)

override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
): Unit = {
states.put(value.id, value)
}}

override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
): Unit = {
if (states.contains(value.id)) {
val product =Product(
id = value.id,
description = Some(states.get(value.id).description),
stock = Some(value.stock),
updatedAt = value.updatedAt)
out.collect(product )
}}

1 答案


0

checkpoints由Flink创建,用于从故障中恢复,而不是在手动关闭后恢复。当 job被取消时,默认行为是Flink删除 checkpoints。既然 job不能再失败,就不需要恢复了。

您有几种选择:

(1) 配置 checkpoints以在取消 job时保留 checkpoints:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

然后,当您重新启动 job时,您需要指示您希望它从特定 checkpoints重新启动:

flink run -s <checkpoint-path> ...

否则,无论何时启动 job,它都将以空状态后端开始。

(2) 不要取消 job,请使用stop with savepoint:

flink stop [-p targetDirectory] [-d] <jobID>

之后你需要再次使用flink run-s。。。从保存点恢复。

使用一个保存点停止操作比依赖于最近有一个 checkpoints可以返回到更干净的方法。

(3) 或者您可以使用ververicaplatformcommunityedition,它将抽象级别提高到不必自己管理这些细节的程度。


我来回答

写文章

提问题

面试题