本文介绍Flink任务流转过程中涉及的图,知道这些可以更好的了解Flink的运行流程。
如上图,Flink中有4种图:StreamGraph、JobGraph、ExecutionGraph、PhysicalGraph,分别处于不同的阶段,承担不同的职责。
StreamGraph
StreamGraph其实就是把我们的代码逻辑以拓扑图的形式组织了一下,其实现类的描述如下:
// StreamGraph.java
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
*/
@Internal
public class StreamGraph implements Pipeline {
...
}
StreamGraph可以查看,方法是在我们的execute()
代码之前加入System.out.println(env.getExecutionPlan());
,这样会输出一个JSON,将这个JSON粘贴到这里就可以查看StreamGraph。以官方的SocketWordCount为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
这个代码是一个非常经典的流处理过程:Source(socketTextStream
) --> Map(flatMap
) --> Window(keyByWindow
) --> Sink(print
),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:
默认并行度设置为1(env.setParallelism(1);
)时:
默认并行度设置为4时:
注:
- 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
- 这里分别展示了两个不同的并行度,是为了后面的对比用。
程序真正运行时也会先转化为StreamGraph,然后进一步转化为JobGraph。
JobGraph
JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为Logical Graph或者Dataflow Graph(题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了@Internal
,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。
官网描述如下:
A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.
代码注释说明如下:
// JobGraph.java
/**
* The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
* define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {
...
}
StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是Operator Chain:
- 什么是Operator Chain?就是将一些独立的小Operator合并为一个大的Operator,比如将上面的
Source
和FlatMap
合并在一起。 - 合并(chain)的条件是什么?其实条件还蛮多的,但简单理解就是如果两个算子的数据是直接Forward的,那就可以合并。反过来,如果两个Operator之间有shuffle(比如keyBy)、rebalance(比如并行度不一样)之类的操作,或者一个Operator有多个上游,那就不能合并。实际使用的时候,我们一般尽量让整个系统里面的算子并行度一致即可,这样能够合并的一般都会合并。
- 合并的好处是什么?Flink运行的时候,一个任务是在一个线程里面运行的,将小的Operator合并成大的Operator,也就相当于将小任务合并成了大任务,这样他们就会在一个线程里面执行,避免了网络IO和序列化操作,同时也减少了线程数。这个细节可参考我后面的文章:Flink快速了解(5)——Job && Task && Subtask && SlotSharing。
- Operator Chain特性默认是开启的,用户可以在配置或者代码里面关掉这个特性(我目前还没有碰到实际运行的时候需要关闭这个优化项的需求场景...)。
JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):
注意:如果应用有必填参数一定要填,否则会报错。
默认并行度设置为1时的JobGraph:
可以看到默认并行度设置为1的时候,优化器把source和flatmap这两个operator串接(chain)成一个大的子任务了,把后面的window和sink两个operator串接(chain)成一个大的子任务了,这样到时候source和flatmap就在一个线程里面执行,window和sink在一个线程里面运行。
默认并行度设置为4时的JobGraph:
如果我们把默认并行度设置为4,那图就变成了上面这样,可以看到相比于并行度为1的图,没有任何合并。主要原因是:
- 代码里的source是
socketTextStream
,这是一个不能并行的operator,所以并行度永远是1(强制设置为非1运行会报错); - flatmap和window代码中没有设置并行度,所以都使用了默认并行度4,但因为中间是hash操作,所以也无法合并;
- 最后一个sink代码里面显式的设置了并行度为1(见代码),和它的直接上游并行度不一致,所以也无法chain。
很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。
Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是JobGraph和该Job依赖的jar包。
ExecutionGraph
JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为并行化、可调度的执行图。这个动作是JobManager(其中的JobMaster组件)做的。
这一层已经更加具体化了,代码说明比较长,但对于使用者基本无需关注,为了完整性,这里也附一下,有兴趣的自行查看:
/**
* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
* intermediate stream, and the communication between them.
*
* <p>The execution graph consists of the following constructs:
* <ul>
* <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
* "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
* The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
* from the JobGraph's corresponding JobVertex.</li>
* <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
* as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
* the ExecutionJobVertex and the index of the parallel subtask</li>
* <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
* for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
* because it is no longer available when requested by later operations. An Execution is always
* identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
* about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
* address the message receiver.</li>
* </ul>
*
* <h2>Global and local failover</h2>
*
* <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
*
* <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
* data flow graph from the last completed checkpoint. Global failover is considered the
* "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
* found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
*
* <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
* The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
* attempts to restart as little as possible, but as much as necessary.
*
* <p>Between local- and global failover, the global failover always takes precedence, because it
* is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
* guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
* with every global failover (and other global actions, like job cancellation, or terminal
* failure). Local failover is always scoped by the modification version that the execution graph
* had when the failover was triggered. If a new global modification version is reached during
* local failover (meaning there is a concurrent global failover), the failover strategy has to
* yield before the global failover.
*/
public class ExecutionGraph implements AccessExecutionGraph {
...
}
PhysicalGraph
物理图其实是任务调度到TaskManager上面真正执行的一种“运行图”,它没有具体对应的类。官方说明如下:
A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.
这四个图越往下,越具体,也越难理解,但实际中对于使用而言,一般只需要知道有这几个环节转换即可,细节无需太关注。下面附一张Jark大神博客的图,更多细节可见参考部分给的他的文章:
总结
其实图这个概念并非Flink原创,Storm、Spark里面也有类似的概念,Flink也是站在巨人的肩膀上而已。从StreamGraph迭代到最终的PhysicalGraph,由抽象到具体,抽象的给人看,具体的给框架看,有点类似于代码编译时的预处理、编译、汇编、链接的过程。
参考:
您好,请教个问题
那么是否,StreamGraph和JobGraph的生成,也是由Client端-->后移到了集群端生成?
自问自答:
Job的提交过程: