本文共 1722 字,大约阅读时间需要 5 分钟。
首先来看下程序错误信息:
caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".
跟着这问题在 Flink官网 的 Issue 列表里看到了一个类似的问题:https://issues.apache.org/jira/browse/FLINK-9056,看评论差不多就是 TaskManager 的 slot 数量不足的原因,导致 job 提交失败。在 Flink 1.63 中已经修复了变成抛出异常了。
竟然知道了是因为 slot 不足的原因了,那博主就在这里简单介绍一下slot。
parallelism 是并行的意思,在 Flink 里面代表每个任务的并行度,适当的提高并行度可以大大提高 job 的执行效率,比如你的 job 消费 kafka 数据过慢,适当调大可能就消费正常了。
(1):在配置文件中设置flink-conf.yaml
parallelism.default: 1
(2)在代码环境中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(10);
注意:这样设置的并行度是你整个程序的并行度,那么后面如果你的每个算子不单独设置并行度覆盖的话,那么后面每个算子的并行度就都是这里设置的并行度的值了。
(3)每个算子设置并行度source.map(new XxxMapFunction).setParallelism(5)
如上,就是在算子后面单独的设置并行度,这样的话,就算你前面设置了 env.setParallelism(10) 也是会被覆盖的。
优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度
例如,如果 Task Manager 有四个 slot,那么它将为每个 slot 分配 25% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
接下来再图例说明一下:
上面图片中有两个 Task Manager,每个 Task Manager 有三个 slot,这样我们的算子最大并行度那么就可以达到 6 个,在同一个 slot 里面可以执行 1 至多个子任务。 那么再看上面的图片,source/map/keyby/window/apply 最大可以有 6 个并行度,sink 只用了 1 个并行。每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。一般情况下你的 slot 数是你每个 TaskManager 的 cpu 的核数。
转载地址:http://jimzi.baihongyu.com/