Flink的Stream Job就是由一些算子构成的(Source和Sink实质也是特殊的算子而已),本文介绍常见的DataStream算子(Operator)。我用一种不太科学的方式将这些算子分成了2类,并起了一个不太严谨的名字:
- 单流算子:这类算子一般在一个流上面使用;
- 多流算子:这类算子往往操作多个流。
单流算子
单流算子大都比较简单,粗略介绍。
- map/flatmap:使用最多的算子,map是输入一个元素,输出一个元素;flatmap是输入一个元素,输出0个或多个元素。
- filter:过滤,条件为真就继续往下传,为假就过滤掉了。
- keyBy:按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,经过keyBy的流变成KeyedStream。一般和其它算子一起使用。
reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:
public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // produce 1,2,3,4,5 DataStream<Long> source = env.fromSequence(1, 5); source.keyBy(value -> 1) // 所有数据在一个Partition .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) throws Exception { // value1 保存上次迭代的结果值 System.out.printf("value1: %d, value2: %d\n", value1, value2); return value1 + value2; } }).print(); env.execute(); } }
输出如下:
value1: 1, value2: 2 3 value1: 3, value2: 3 6 value1: 6, value2: 4 10 value1: 10, value2: 5 15
Aggregation算子:在KeyedStream上面使用,包括
sum
、min
、minBy
、max
,maxBy
。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧:public class AggregationDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment(); envMin.setParallelism(1); envMinBy.setParallelism(1); List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>(); data.add(new Tuple3<>(0, 2, 4)); data.add(new Tuple3<>(0, 4, 5)); data.add(new Tuple3<>(0, 3, 3)); data.add(new Tuple3<>(0, 1, 2)); data.add(new Tuple3<>(1, 2, 4)); data.add(new Tuple3<>(1, 5, 1)); data.add(new Tuple3<>(1, 1, 0)); data.add(new Tuple3<>(1, 2, 2)); System.out.println("Min:"); DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data); sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print(); envMin.execute(); System.out.println("\nMinBy:"); DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data); sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print(); envMinBy.execute(); } }
输出结果:
Min: (0,2,4) (0,2,4) (0,2,4) (0,1,4) (1,2,4) (1,2,4) (1,1,4) (1,1,4) MinBy: (0,2,4) (0,2,4) (0,2,4) (0,1,2) (1,2,4) (1,2,4) (1,1,0) (1,1,0)
可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。
- window类算子:这个就不说了,前面的多篇文章已经介绍过了。
下面看稍微复杂一些的多流算子。
多流算子
多流算子的差异点主要体现在以下3个方面:
- 能同时处理的流的个数
- 是否可以处理不同类型的流
- 如何处理
union
union用于将多个、同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Long> source = env.fromSequence(0, 5);
source.print();
DataStream<Long> unionStream = source.union(source);
unionStream.print();
env.execute();
}
}
输出:
# source
0
1
2
3
4
5
# unionStream
0
0
1
1
2
2
3
3
4
4
5
5
join
join只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:
Window Join:通过窗口获取一个数据集
- Tumbling Window Join
- Sliding Window Join
- Session Window Join
- Interval Join:通过定义一个时间段获取一个数据集
Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):
图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());
orangeStream.join(greenStream)
.where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>)
(orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1)
.print();
env.execute();
}
static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1000)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
ctx.emitWatermark(new Watermark(1000));
// window: [1000, 2000)
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
ctx.emitWatermark(new Watermark(2000));
// window: [2000, 3000)
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
ctx.emitWatermark(new Watermark(3000));
// window: [3000, 4000)
ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500);
}
@Override
public void cancel() {
}
}
static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1000)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
// window: [1000, 2000)
ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500);
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
// window: [2000, 3000)
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500);
// window: [3000, 4000)
ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500);
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500);
;
}
@Override
public void cancel() {
}
}
}
代码运行如下:
0,0
0,1
1,0
1,1
2,3
3,3
4,4
5,4
Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound]
,然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
。IntervalJoin目前只支持Event Time。 其语法如下:
stream.keyBy(<KeySelector>)
.intervalJoin(otherStream.keyBy(<KeySelector>))
.between(<lowerBoundTime>, <upperBoundTime>)
.process (<ProcessJoinFunction>);
这里再借用官方的一个图:
上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1]
,下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());
orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception {
out.collect(orangeTuple2.f1 + "," + greenTuple2.f1);
}
}).print();
env.execute();
}
static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
// window: [1, 2)
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1);
// window: [6, 7)
ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6);
// window: [7, 8)
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
}
@Override
public void cancel() {
}
}
static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.emitWatermark(new Watermark(0));
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.emitWatermark(new Watermark(2));
ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2);
ctx.emitWatermark(new Watermark(3));
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3);
ctx.emitWatermark(new Watermark(4));
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4);
ctx.emitWatermark(new Watermark(5));
ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5);
ctx.emitWatermark(new Watermark(7));
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
}
@Override
public void cancel() {
}
}
}
输出如下:
0,0
0,1
2,0
2,1
5,6
7,6
7,7
总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。
cogroup
前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:
dataStream.coGroup(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>
.apply (new CoGroupFunction () {...});
相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:
public class CoGroupDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource());
orangeStream.coGroup(greenStream)
.where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("window:{ orange stream elements: [ ");
orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
sb.append("], green stream elements: [ ");
green.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
sb.append("]");
out.collect(sb.toString());
}
})
.print();
env.execute();
}
}
输出如下:
window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ]
window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ]
window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ]
window:{ orange stream elements: [ ], green stream elements: [ 4 ]
window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]
connect
connect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。
单独的connect:
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 主数据流
DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long i = 0;
while (true) {
ctx.collect(i++);
Thread.sleep(500);
}
}
@Override
public void cancel() {
}
});
// 规则数据流
DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect("one");
Thread.sleep(5000);
ctx.collect("two");
Thread.sleep(5000);
ctx.collect("three");
Thread.sleep(Long.MAX_VALUE);
}
@Override
public void cancel() {
}
});
dataStream.connect(ruleStream)
.flatMap(new CoFlatMapFunction<Long, String, Object>() {
String rule;
@Override
public void flatMap1(Long value, Collector<Object> out) throws Exception {
if ("one".equalsIgnoreCase(rule)) {
out.collect(value);
} else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
out.collect(value);
} else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
out.collect(value);
}
}
@Override
public void flatMap2(String value, Collector<Object> out) throws Exception {
System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
rule = value;
}
}).print();
env.execute();
}
}
输出如下:
update rule, old rule = null, new rule = one
1
2
3
4
5
6
7
8
9
update rule, old rule = one, new rule = two
10
12
14
16
18
update rule, old rule = two, new rule = three
21
24
27
30
33
36
...
配合broadcast实现:
public class BroadcastDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 主数据流
DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long i = 0;
while (true) {
ctx.collect(i++);
Thread.sleep(500);
}
}
@Override
public void cancel() {
}
});
// 规则数据流
DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect("one");
Thread.sleep(5000);
ctx.collect("two");
Thread.sleep(5000);
ctx.collect("three");
Thread.sleep(Long.MAX_VALUE);
}
@Override
public void cancel() {
}
});
MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<String>() {
}));
BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
dataStream.connect(ruleBroadcastStream)
.process(new BroadcastProcessFunction<Long, String, Long>() {
String rule;
@Override
public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception {
if ("one".equalsIgnoreCase(rule)) {
out.collect(value);
} else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
out.collect(value);
} else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception {
System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
rule = value;
}
}).print();
env.execute();
}
}
输出与单独connect的输出一模一样,这里就不再附运行结果了。
Iterate
Iterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne
),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。
public class IterateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4);
tuple2s.add(new Tuple2<>("Jim", 3));
tuple2s.add(new Tuple2<>("John", 2));
tuple2s.add(new Tuple2<>("Lily", 1));
tuple2s.add(new Tuple2<>("Lucy", 4));
// source
DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s);
source.print();
// 得到一个IterativeStream
IterativeStream<Tuple2<String, Integer>> iteration = source.iterate();
// 执行常规计算
DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1);
return new Tuple2<>(value.f0, value.f1 - 1);
}
});
// 找出需要迭代的元素形成一个流
DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
boolean greaterThanZero = value.f1 > 0;
if (greaterThanZero) {
System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1);
}
return greaterThanZero;
}
});
// 将需要迭代的流传递给closeWith方法
iteration.closeWith(stillGreaterThanZero);
minusOne.filter(tuple2 -> tuple2.f1 <= 0).print();
env.execute();
}
}
输出如下:
(Jim,3)
(John,2)
(Lily,1)
(Lucy,4)
[minusOne] Jim: 3
Jim is still greater than zero(now: 2), back to minusOne
[minusOne] John: 2
John is still greater than zero(now: 1), back to minusOne
[minusOne] Lily: 1
(Lily,0)
[minusOne] Lucy: 4
Lucy is still greater than zero(now: 3), back to minusOne
[minusOne] Jim: 2
Jim is still greater than zero(now: 1), back to minusOne
[minusOne] John: 1
(John,0)
[minusOne] Lucy: 3
Lucy is still greater than zero(now: 2), back to minusOne
[minusOne] Jim: 1
(Jim,0)
[minusOne] Lucy: 2
Lucy is still greater than zero(now: 1), back to minusOne
[minusOne] Lucy: 1
(Lucy,0)
总结
一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。
下篇文章介绍另外一个非常重要的算子:Flink Async I/O。
博主好,Flink系列一共多少内容呢?
说实话就是看时间,最近几个月有些忙碌,暂时停滞了。