Flink statefun共址功能通信


0

我有一个正常 job 的嵌入式 job,我想部署额外的同位置 job。

代码如下

@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {

@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvider provider = new FunctionProvider();
binder.bindFunctionProvider( CoLocated.TYPE,provider );

binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);

}
}

我得到一个错误如下

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

这个错误很明显,需要我定义入口。

chaining接->

新定义的模块将接收来自另一个模块的消息并发送给kafka。

    我是否必须为每个同处 job 定义入口?如果没有,我该怎么做?

1 答案


0

回答内联,仅供参考,你问的任何问题都不是同一地点的具体问题。这些属性适用于包含同一位置和远程混合 job 负载的远程模块和 job。

我是否必须为每个同处 job 定义入口?如果没有,我该怎么做?

是的,每个 job(远程或同位)至少需要一个入口。入口是将来自外部世界的消息消费到statefun应用程序中的通道。想想 Kafka 或动静。如果没有入口, job将不会执行任何操作,因为没有开始处理的初始消息。

对于每个入口,您将绑定1个或多个路由器,这些路由器从入口获取每条消息,并 root据它们的函数类型将它们转发给0个或更多个函数[1]。

我怎样才能找到同一地点的 job 来交流?使用相同的FunctionType就足够了吗?

是的,函数只是使用它们的函数类型相互传递消息。

同一位置的功能是否通过入口/出口进行通信?

不,消息是使用apacheflink运行时在函数之间传递的,它包含一个高度优化的网络堆栈。一旦从入口提取消息,它将不再与该入口交互。如果您感兴趣,您可以在社区撰写的一些博客文章中了解Flink的网络堆栈是如何 job 的,但这并不是在生产中成功使用statefun的必要条件[2]。

[1]https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/index.html outer

[2]https://flink.apache.org/2019/06/05/flink-network-stack.html


我来回答

写文章

提问题

面试题