本文讲一些比较八股的概念性东西,不是特别实用,但对于理解任务的运行非常有帮助。先做个自我检测:你知道Flink的Job指什么吗?Task呢?Subtask呢?这些和JVM Thread的对应关系是什么?你能估计出你的应用跑起来以后大概会产生多少个Thread吗?你知道你的应用需要多少个Slot吗?OK,如果你都清楚了,那Ctrl/Command+w吧...。如果还有些疑问,可以继续往下看。

注意:本文的“任务”一词是个通用概念,不代表Task。

Job && Task && Subtask

Job最容易理解,一个Job代表一个可以独立提交的大任务,可以认为一个execute或者executeAsync就产生一个Job,我们向JobManager提交任务的时候就是以Job为单位的,只不过一份代码里面可以包含多个Job。比如SocketWordCount就是一个Job。

至于Task和Subtask看下面我画的图:

task-chain

图说明如下:

  • 图中每个圆代表一个Operator,每个虚线圆角框代表一个Task,每个虚线方框代表一个Subtask,其中的p表示并行度。
  • 最上面是StreamGraph,是没有经过任何优化的时候,可以看到包含4个Operator/Task:Task A1、Task A2、Task B、Task C。
  • StreamGraph经过Operator Chain之后,Task A1和Task A2两个Task合并成了一个新的Task A(同时也可以认为合并产生了一个新的Operator),得到了中间的JobGraph。
  • 然后以并行度为2(需要2个slot)执行的时候,Task A产生了2个Subtask,分别占用了Thread #1和Thread #2两个线程;Task B产生了2个Subtask,分别占用了Thread #3和Thread #3两个线程;Task C产生了1个Subtask,占用了Thread5.

应该非常清楚了,下面总结一下结论:

  1. Task是逻辑概念,一个Operator就代表一个Task(多个Operator被chain之后产生的新Operator算一个Operator);
  2. 真正运行的时候,Task会按照并行度分成多个Subtask,Subtask是执行/调度的基本单元
  3. 每个Subtask需要一个线程来执行

好了,理解了这些概念,评估一个应用大概会产生多少个线程的时候(不考虑一些框架自身的线程)根据JobGraph就可以大概计算出来了:

$$ \sum_{i=1}^n operator_i * parallelism_i $$

即累加所有Operator和它的并行度的乘积。

还没完,再讲一下Slot Sharing。

Slot Sharing

架构部分讲了TaskManager是真正干活的,启动的时候会将自己的资源以Slot的方式注册到ResourceManager,然后JobMaster从ResourceManager处申请到Slot资源之后将自己优化过后的任务调度到这些Slot上面去运行,在整个过程中Subtask是调度的基本单元,Slot则是资源分配的基本单元。需要注意的是目前Slot只隔离内存,不隔离CPU。

为了高效的使用资源,Flink默认允许同一个Job中不同Task的Subtask运行在一个Slot中,这就是SlotSharing。注意一下描述中的几个关键条件:

  1. 必须是同一个Job。这个很好理解,slot是给Job分配的资源,目的就是隔离各个Job,如果跨Job共享,但隔离就失效了;
  2. 必须是不同Task的Subtask。这样是为了更好的资源均衡和利用。一个计算流中(pipeline),每个Subtask的资源消耗肯定是不一样的,如果都均分slot,那必然有些资源利用率高,有些低。限制不同Task的Subtask共享可以尽量让资源占用高的和资源占用低的放一起,而不是把多个高的或多个低的放一起。比如一个计算流中,source和sink一般都是IO操作,特别是source,一般都是网络读,相比于中间的计算Operator,资源消耗并不大。
  3. 默认是允许sharing的,也就是你也可以关闭这个特性。

下面看下官方的两个图:

6个Slot,5个Subtask,并行度为2:

slot-sharing-2

此时Subtask少于Slot个数,所以每个Subtask独占一个Slot,没有SlotSharing。把并行度改为6:

slot-sharing-6

此时,Subtask的个数多于Slot了,所以出现了SlotSharing,一个Slot中分配了多个Subtask,特别是最左边的Slot中跑了一个完整的Pipeline。SlotSharing除了提高了资源利用率,还简化了并行度和Slot之间的关系——一个Job运行需要的Slot个数就是其中并行度最高的那个Task的并行度:

$$ Job需要的Slot个数=\max(parallelism_{task_1}, parallelism_{task_2}, ..., parallelism_{task_n}) $$

掌握了这些八股知识,就能更好的评估资源了。

Reference: