本文介绍Flink任务流转过程中涉及的图,知道这些可以更好的了解Flink的运行流程。

flink-graph

如上图,Flink中有4种图:StreamGraphJobGraphExecutionGraphPhysicalGraph,分别处于不同的阶段,承担不同的职责。

StreamGraph

StreamGraph其实就是把我们的代码逻辑以拓扑图的形式组织了一下,其实现类的描述如下:

// StreamGraph.java
/**
 * Class representing the streaming topology. It contains all the information
 * necessary to build the jobgraph for the execution.
 */
@Internal
public class StreamGraph implements Pipeline {
...
}

StreamGraph可以查看,方法是在我们的execute()代码之前加入System.out.println(env.getExecutionPlan());,这样会输出一个JSON,将这个JSON粘贴到这里就可以查看StreamGraph。以官方的SocketWordCount为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows
 * </pre>
 * and run this example with the hostname and the port as arguments.
 */
@SuppressWarnings("serial")
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy(value -> value.word)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // ------------------------------------------------------------------------

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

这个代码是一个非常经典的流处理过程:Source(socketTextStream) --> Map(flatMap) --> Window(keyByWindow) --> Sink(print),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:

默认并行度设置为1(env.setParallelism(1);)时:

SocketWordCountStreamGraph-p1

默认并行度设置为4时:

SocketWordCountStreamGraph-p4

注:

  1. 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
  2. 这里分别展示了两个不同的并行度,是为了后面的对比用。

程序真正运行时也会先转化为StreamGraph,然后进一步转化为JobGraph。

JobGraph

JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为Logical Graph或者Dataflow Graph(题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了@Internal,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。

官网描述如下:

A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.

代码注释说明如下:

// JobGraph.java
/**
 * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
 * All programs from higher level APIs are transformed into JobGraphs.
 *
 * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
 * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
 * but inside certain special vertices that establish the feedback channel amongst themselves.
 *
 * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
 * define the characteristics of the concrete operation and intermediate data.
 */
public class JobGraph implements Serializable {
...
}

StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是Operator Chain

  • 什么是Operator Chain?就是将一些独立的小Operator合并为一个大的Operator,比如将上面的SourceFlatMap合并在一起。
  • 合并(chain)的条件是什么?其实条件还蛮多的,但简单理解就是如果两个算子的数据是直接Forward的,那就可以合并。反过来,如果两个Operator之间有shuffle(比如keyBy)、rebalance(比如并行度不一样)之类的操作,或者一个Operator有多个上游,那就不能合并。实际使用的时候,我们一般尽量让整个系统里面的算子并行度一致即可,这样能够合并的一般都会合并。
  • 合并的好处是什么?Flink运行的时候,一个任务是在一个线程里面运行的,将小的Operator合并成大的Operator,也就相当于将小任务合并成了大任务,这样他们就会在一个线程里面执行,避免了网络IO和序列化操作,同时也减少了线程数。这个细节可参考我后面的文章:Flink快速了解(5)——Job && Task && Subtask && SlotSharing
  • Operator Chain特性默认是开启的,用户可以在配置或者代码里面关掉这个特性(我目前还没有碰到实际运行的时候需要关闭这个优化项的需求场景...)。

JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):

inspect-job-graph

注意:如果应用有必填参数一定要填,否则会报错。

默认并行度设置为1时的JobGraph:

jobgraph-p1

可以看到默认并行度设置为1的时候,优化器把source和flatmap这两个operator串接(chain)成一个大的子任务了,把后面的window和sink两个operator串接(chain)成一个大的子任务了,这样到时候source和flatmap就在一个线程里面执行,window和sink在一个线程里面运行。

默认并行度设置为4时的JobGraph:

jobgraph-p4

如果我们把默认并行度设置为4,那图就变成了上面这样,可以看到相比于并行度为1的图,没有任何合并。主要原因是:

  1. 代码里的source是socketTextStream,这是一个不能并行的operator,所以并行度永远是1(强制设置为非1运行会报错);
  2. flatmap和window代码中没有设置并行度,所以都使用了默认并行度4,但因为中间是hash操作,所以也无法合并;
  3. 最后一个sink代码里面显式的设置了并行度为1(见代码),和它的直接上游并行度不一致,所以也无法chain。

很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。

Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是JobGraph该Job依赖的jar包

ExecutionGraph

JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为并行化可调度的执行图。这个动作是JobManager(其中的JobMaster组件)做的。

这一层已经更加具体化了,代码说明比较长,但对于使用者基本无需关注,为了完整性,这里也附一下,有兴趣的自行查看:

/**
 * The execution graph is the central data structure that coordinates the distributed
 * execution of a data flow. It keeps representations of each parallel task, each
 * intermediate stream, and the communication between them.
 *
 * <p>The execution graph consists of the following constructs:
 * <ul>
 *     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
 *         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
 *         The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
 *         from the JobGraph's corresponding JobVertex.</li>
 *     <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
 *         as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
 *         the ExecutionJobVertex and the index of the parallel subtask</li>
 *     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
 *         for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
 *         because it is no longer available when requested by later operations. An Execution is always
 *         identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
 *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
 *         address the message receiver.</li>
 * </ul>
 *
 * <h2>Global and local failover</h2>
 *
 * <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
 *
 * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
 * data flow graph from the last completed checkpoint. Global failover is considered the
 * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
 * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
 *
 * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
 * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
 * attempts to restart as little as possible, but as much as necessary.
 *
 * <p>Between local- and global failover, the global failover always takes precedence, because it
 * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
 * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
 * with every global failover (and other global actions, like job cancellation, or terminal
 * failure). Local failover is always scoped by the modification version that the execution graph
 * had when the failover was triggered. If a new global modification version is reached during
 * local failover (meaning there is a concurrent global failover), the failover strategy has to
 * yield before the global failover.
 */
public class ExecutionGraph implements AccessExecutionGraph {
...
}

PhysicalGraph

物理图其实是任务调度到TaskManager上面真正执行的一种“运行图”,它没有具体对应的类。官方说明如下:

A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.

这四个图越往下,越具体,也越难理解,但实际中对于使用而言,一般只需要知道有这几个环节转换即可,细节无需太关注。下面附一张Jark大神博客的图,更多细节可见参考部分给的他的文章:

graph-流转

总结

其实图这个概念并非Flink原创,Storm、Spark里面也有类似的概念,Flink也是站在巨人的肩膀上而已。从StreamGraph迭代到最终的PhysicalGraph,由抽象到具体,抽象的给人看,具体的给框架看,有点类似于代码编译时的预处理、编译、汇编、链接的过程。

参考: