




// StreamGraph.java
 * Class representing the streaming topology. It contains all the information
 * necessary to build the jobgraph for the execution.
public class StreamGraph implements Pipeline {


 * Implements a streaming windowed version of the "WordCount" program.
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows
 * </pre>
 * and run this example with the hostname and the port as arguments.
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                "type the input text into the command line");

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));

                .keyBy(value -> value.word)

                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);

        // print the results with a single thread, rather than in parallel

        env.execute("Socket Window WordCount");

    // ------------------------------------------------------------------------

     * Data type for words with count.
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;

        public String toString() {
            return word + " : " + count;

这个代码是一个非常经典的流处理过程:Source(socketTextStream) --> Map(flatMap) --> Window(keyByWindow) --> Sink(print),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:






  1. 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
  2. 这里分别展示了两个不同的并行度,是为了后面的对比用。



JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为Logical Graph或者Dataflow Graph(题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了@Internal,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。


A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.


// JobGraph.java
 * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
 * All programs from higher level APIs are transformed into JobGraphs.
 * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
 * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
 * but inside certain special vertices that establish the feedback channel amongst themselves.
 * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
 * define the characteristics of the concrete operation and intermediate data.
public class JobGraph implements Serializable {

StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是Operator Chain

  • 什么是Operator Chain?就是将一些独立的小Operator合并为一个大的Operator,比如将上面的SourceFlatMap合并在一起。
  • 合并(chain)的条件是什么?其实条件还蛮多的,但简单理解就是如果两个算子的数据是直接Forward的,那就可以合并。反过来,如果两个Operator之间有shuffle(比如keyBy)、rebalance(比如并行度不一样)之类的操作,或者一个Operator有多个上游,那就不能合并。实际使用的时候,我们一般尽量让整个系统里面的算子并行度一致即可,这样能够合并的一般都会合并。
  • 合并的好处是什么?Flink运行的时候,一个任务是在一个线程里面运行的,将小的Operator合并成大的Operator,也就相当于将小任务合并成了大任务,这样他们就会在一个线程里面执行,避免了网络IO和序列化操作,同时也减少了线程数。这个细节可参考我后面的文章:Flink快速了解(5)——Job && Task && Subtask && SlotSharing
  • Operator Chain特性默认是开启的,用户可以在配置或者代码里面关掉这个特性(我目前还没有碰到实际运行的时候需要关闭这个优化项的需求场景...)。

JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):









  1. 代码里的source是socketTextStream,这是一个不能并行的operator,所以并行度永远是1(强制设置为非1运行会报错);
  2. flatmap和window代码中没有设置并行度,所以都使用了默认并行度4,但因为中间是hash操作,所以也无法合并;
  3. 最后一个sink代码里面显式的设置了并行度为1(见代码),和它的直接上游并行度不一致,所以也无法chain。

很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。





 * The execution graph is the central data structure that coordinates the distributed
 * execution of a data flow. It keeps representations of each parallel task, each
 * intermediate stream, and the communication between them.
 * <p>The execution graph consists of the following constructs:
 * <ul>
 *     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
 *         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
 *         The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
 *         from the JobGraph's corresponding JobVertex.</li>
 *     <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
 *         as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
 *         the ExecutionJobVertex and the index of the parallel subtask</li>
 *     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
 *         for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
 *         because it is no longer available when requested by later operations. An Execution is always
 *         identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
 *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
 *         address the message receiver.</li>
 * </ul>
 * <h2>Global and local failover</h2>
 * <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
 * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
 * data flow graph from the last completed checkpoint. Global failover is considered the
 * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
 * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
 * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
 * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
 * attempts to restart as little as possible, but as much as necessary.
 * <p>Between local- and global failover, the global failover always takes precedence, because it
 * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
 * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
 * with every global failover (and other global actions, like job cancellation, or terminal
 * failure). Local failover is always scoped by the modification version that the execution graph
 * had when the failover was triggered. If a new global modification version is reached during
 * local failover (meaning there is a concurrent global failover), the failover strategy has to
 * yield before the global failover.
public class ExecutionGraph implements AccessExecutionGraph {



A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.




