Flink的Stream Job就是由一些算子构成的(Source和Sink实质也是特殊的算子而已),本文介绍常见的DataStream算子(Operator)。我用一种不太科学的方式将这些算子分成了2类,并起了一个不太严谨的名字:

  • 单流算子:这类算子一般在一个流上面使用;
  • 多流算子:这类算子往往操作多个流。

单流算子

单流算子大都比较简单,粗略介绍。

  1. map/flatmap:使用最多的算子,map是输入一个元素,输出一个元素;flatmap是输入一个元素,输出0个或多个元素。
  2. filter:过滤,条件为真就继续往下传,为假就过滤掉了。
  3. keyBy:按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,经过keyBy的流变成KeyedStream。一般和其它算子一起使用。
  4. reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:

    public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        // produce 1,2,3,4,5
        DataStream<Long> source = env.fromSequence(1, 5);
    
        source.keyBy(value -> 1)    // 所有数据在一个Partition
            .reduce(new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long value1, Long value2) throws Exception {
                // value1 保存上次迭代的结果值
                System.out.printf("value1: %d, value2: %d\n", value1, value2);
                return value1 + value2;
            }
            }).print();
    
        env.execute();
    }
    }

    输出如下:

    value1: 1, value2: 2
    3
    value1: 3, value2: 3
    6
    value1: 6, value2: 4
    10
    value1: 10, value2: 5
    15
  5. Aggregation算子:在KeyedStream上面使用,包括summinminBymaxmaxBy。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧:

     public class AggregationDemo {
         public static void main(String[] args) throws Exception {
             StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment();
             StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment();
             envMin.setParallelism(1);
             envMinBy.setParallelism(1);
    
             List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
             data.add(new Tuple3<>(0, 2, 4));
             data.add(new Tuple3<>(0, 4, 5));
             data.add(new Tuple3<>(0, 3, 3));
             data.add(new Tuple3<>(0, 1, 2));
             data.add(new Tuple3<>(1, 2, 4));
             data.add(new Tuple3<>(1, 5, 1));
             data.add(new Tuple3<>(1, 1, 0));
             data.add(new Tuple3<>(1, 2, 2));
    
             System.out.println("Min:");
             DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data);
             sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print();
    
             envMin.execute();
    
             System.out.println("\nMinBy:");
             DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data);
             sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print();
    
             envMinBy.execute();
         }
     }

    输出结果:

     Min:
     (0,2,4)
     (0,2,4)
     (0,2,4)
     (0,1,4)
     (1,2,4)
     (1,2,4)
     (1,1,4)
     (1,1,4)
    
     MinBy:
     (0,2,4)
     (0,2,4)
     (0,2,4)
     (0,1,2)
     (1,2,4)
     (1,2,4)
     (1,1,0)
     (1,1,0)

    可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。

  6. window类算子:这个就不说了,前面的多篇文章已经介绍过了。

下面看稍微复杂一些的多流算子。

多流算子

多流算子的差异点主要体现在以下3个方面:

  • 能同时处理的流的个数
  • 是否可以处理不同类型的流
  • 如何处理

union

union用于将多个同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:

public class UnionDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Long> source = env.fromSequence(0, 5);
    source.print();

    DataStream<Long> unionStream = source.union(source);
    unionStream.print();

    env.execute();
  }
}

输出:

# source 
0
1
2
3
4
5
# unionStream
0
0
1
1
2
2
3
3
4
4
5
5

join

join只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:

  • Window Join:通过窗口获取一个数据集

    • Tumbling Window Join
    • Sliding Window Join
    • Session Window Join
  • Interval Join:通过定义一个时间段获取一个数据集

Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):

Tumbling-Window-Join

图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:

public class WindowJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.join(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>)
            (orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1)
        .print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);

      ctx.emitWatermark(new Watermark(1000));
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);

      ctx.emitWatermark(new Watermark(2000));
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);

      ctx.emitWatermark(new Watermark(3000));
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500);
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500);
      ;
    }

    @Override
    public void cancel() {
    }
  }
}

代码运行如下:

0,0
0,1
1,0
1,1
2,3
3,3
4,4
5,4

Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound],然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]。IntervalJoin目前只支持Event Time。 其语法如下:

stream.keyBy(<KeySelector>)
    .intervalJoin(otherStream.keyBy(<KeySelector>))
    .between(<lowerBoundTime>, <upperBoundTime>)
    .process (<ProcessJoinFunction>);

这里再借用官方的一个图:

Interval-Join

上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1],下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:

public class IntervalJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception {
            out.collect(orangeTuple2.f1 + "," + greenTuple2.f1);
          }
        }).print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      // window: [1, 2)
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1);
      // window: [6, 7)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6);
      // window: [7, 8)
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

      ctx.emitWatermark(new Watermark(0));
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.emitWatermark(new Watermark(2));
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2);
      ctx.emitWatermark(new Watermark(3));
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3);
      ctx.emitWatermark(new Watermark(4));
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4);
      ctx.emitWatermark(new Watermark(5));
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5);
      ctx.emitWatermark(new Watermark(7));
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }
}

输出如下:

0,0
0,1
2,0
2,1
5,6
7,6
7,7

总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。

cogroup

前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:

dataStream.coGroup(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>
    .apply (new CoGroupFunction () {...});

相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:

public class CoGroupDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource());

    orangeStream.coGroup(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception {
            StringBuffer sb = new StringBuffer();
            sb.append("window:{ orange stream elements: [ ");
            orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("], green stream elements: [ ");
            green.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("]");
            out.collect(sb.toString());
          }
        })
        .print();

    env.execute();
  }
}

输出如下:

window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ]
window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ]
window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ]
window:{ orange stream elements: [ ], green stream elements: [ 4 ]
window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]

connect

connect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。

单独的connect:

public class ConnectDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });

    dataStream.connect(ruleStream)
        .flatMap(new CoFlatMapFunction<Long, String, Object>() {
          String rule;

          @Override
          public void flatMap1(Long value, Collector<Object> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void flatMap2(String value, Collector<Object> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出如下:

update rule, old rule = null, new rule = one
1
2
3
4
5
6
7
8
9
update rule, old rule = one, new rule = two
10
12
14
16
18
update rule, old rule = two, new rule = three
21
24
27
30
33
36
...

配合broadcast实现:

public class BroadcastDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });
    MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
        "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(new TypeHint<String>() {
        }));
    BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

    dataStream.connect(ruleBroadcastStream)
        .process(new BroadcastProcessFunction<Long, String, Long>() {
          String rule;

          @Override
          public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出与单独connect的输出一模一样,这里就不再附运行结果了。

Iterate

Iterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。

public class IterateDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4);
    tuple2s.add(new Tuple2<>("Jim", 3));
    tuple2s.add(new Tuple2<>("John", 2));
    tuple2s.add(new Tuple2<>("Lily", 1));
    tuple2s.add(new Tuple2<>("Lucy", 4));

    // source
    DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s);
    source.print();
    // 得到一个IterativeStream
    IterativeStream<Tuple2<String, Integer>> iteration = source.iterate();
    // 执行常规计算
    DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
        System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1);
        return new Tuple2<>(value.f0, value.f1 - 1);
      }
    });
    // 找出需要迭代的元素形成一个流
    DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
      @Override
      public boolean filter(Tuple2<String, Integer> value) throws Exception {
        boolean greaterThanZero = value.f1 > 0;
        if (greaterThanZero) {
          System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1);
        }
        return greaterThanZero;
      }
    });
    // 将需要迭代的流传递给closeWith方法
    iteration.closeWith(stillGreaterThanZero);

    minusOne.filter(tuple2 -> tuple2.f1 <= 0).print();

    env.execute();
  }
}

输出如下:

(Jim,3)
(John,2)
(Lily,1)
(Lucy,4)
[minusOne] Jim: 3
Jim is still greater than zero(now: 2), back to minusOne
[minusOne] John: 2
John is still greater than zero(now: 1), back to minusOne
[minusOne] Lily: 1
(Lily,0)
[minusOne] Lucy: 4
Lucy is still greater than zero(now: 3), back to minusOne
[minusOne] Jim: 2
Jim is still greater than zero(now: 1), back to minusOne
[minusOne] John: 1
(John,0)
[minusOne] Lucy: 3
Lucy is still greater than zero(now: 2), back to minusOne
[minusOne] Jim: 1
(Jim,0)
[minusOne] Lucy: 2
Lucy is still greater than zero(now: 1), back to minusOne
[minusOne] Lucy: 1
(Lucy,0)

总结

一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。

下篇文章介绍另外一个非常重要的算子:Flink Async I/O。