在前面的流处理随谈一文中已经简单介绍了Watermark,本文主要再结合Flink具体分析一下,作为补充。

理论

如果看完之前的文章,已经完全理解了Watermark,那可以直接跳过这部分,看实战部分。如果还不太理解,我通过几个问题来阐述一下,帮助你理解。要注意的是下面的描述方式和实际实现未必完全一样(有些甚至是我的个人观点),但可以帮助你更好的理解。

What?

Watermark是什么?从不同的维度可以有不同的理解

  1. 从Watermark的计算角度看:可以将Watermark理解为一个函数:$F(P) -> E$,它的输入是当前的系统时间,输出是一个Event Time(一个时间戳),而且输出的这个时间戳是严格单调递增的。这样看,Watermark就是一个函数。
  2. 从Watermark的具体形式来看:可以将Watermark当成一个个时间戳,值就是1中输出的那个时间戳。
  3. 从Watermark流转的角度看:可以将Watermark理解成夹杂在正常流事件中的一个个特殊事件。

这3种描述方式,看似不同,实则一样,只是从不同的角度去看了而已。不管怎么理解,我们必须知道:流处理系统规定,如果某个时刻Watermark的值为T1,那系统就认为凡是早于或等于T1时间的事件都已经收到了。注意,这个就是Watermark所代表的含义,实际因为现实中各种情况,未必能严格做到这样,但目标就是要达到上面规定的这样,或者无限逼近。

Why?

为什么需要Watermark?这个也有很多种描述方式,往大了说就是提供一种理论上解决分布式系统中消息乱序问题(这是分布式系统中一个经典难题)的方案。说小点就是在有状态的流计算中,当我们关注事件的顺序或者完整性时,需要有这么一种机制能实现这个需求。

这里的完整性我举个例子解释一下:比如我们基于事件发生时间统计每5min的用户PV总量,那比如12:00-12:05这个5min的统计该在什么时间点计算呢?假设没有Watermark这个概念,你就永远不知道什么时候12:00-12:05区间的所有事件才全到齐。你不能假定收到12:00-12:05的数据就认为之前的数据已经全部来了,因为数据可能延迟+乱序了。而Watermark就是为了解决这个问题而提出的,当你收到Watermark的值为12:00-12:05的事件时,你就可以认为早于这个时间的数据已经都到了,数据已经完整了,可以进行12:00-12:05这个5min区间的数据计算了。至于如何保证,这个是框架要做的事情(当然一般需要开发者参与)。

Where?

哪里需要Watermark?这里我给一个简单粗暴的结论,当同时满足下面两个条件的时候才会需要Watermark:

  1. 计算中使用了时间相关的算子(time-based operations),其实再明确点,就是使用了Window的时候(注:Flink的Global Window除外,这个Window不是基于时间的)。
  2. 1中使用的时间相关的算子选择使用事件时间,即Event Time(注:如果是Flink的话,也包含Ingestion Time)。

这里再解释一下2。前文我们介绍过有两种时间Event Time和Processing Time(Flink独有的Ingestion Time在Watermark这里可以归结为Event Time,后文不再另行说明),时间相关的算子选择时间时必然是二选一。并不是选择Processing Time的时候就没有Watermark了,只是这个时候Processing Time自身就是一个完美的Watermark(因为时间一去不复返,Processing Time永远是单调递增的),并不需要产生单独的Watermark了。所以在Processing Time里面,你可以认为Watermark没有意义了,所以去掉了,或者认为Processing Time自身就是Watermark都行。

How?

如何使用Watermark?实战部分介绍Flink中的Watermark如何使用。

关于Watermark我给一个我个人的意见或者看法吧:Watermark其实是一个比较好理解的概念和机制,但却很容易给刚接触它的人造成困惑,我觉得一方面是它关联了时间和Window的概念,也就是如果你不了解Event Time和Processing Time的区别,你不了解Window的机制,你就很难理解Watermark,因为它就是为了解决Event Time + Window中的问题而设计的;另一方面是Watermark没有特别好的观测点和跟踪手段,导致你只能一直在外围理解,却始终无法揭开它的面纱,直面它。关于第一点,我的建议是先了解相关的Time和Window的概念,然后从宏观上看问题,想清楚Watermark的What、Why、Where三个问题,基本也就了解了这个概念的本质了。关于第二点,我会在实战部分给出一些观测的方式,虽然很有限,但也能看到一些细节。

实战

场景介绍

为了方便说明,我构造了一个简单的场景,假设有一个设备产生了一组事件,如下:

{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

一共9个事件,id是事件名称,timestamp是设备端事件真实产生的时间。也就是事件真实产生顺序是:

event1, event2, event3, event4, event5, event6, event7, event8, event9

但在传输过程中因为各种现实原因乱序了,到Flink这里的时候,事件顺序变成了:

event1, event2, event4, event5, event7, event3, event6, event8, event9

现在我们要做的事情就是计算每5秒中的事件个数,以此来判断事件高峰期。

说明:

  1. 这个计算是非常有代表性的,比如电商统计每小时的pv就能知道每天用户高峰期发生在哪几个时段,这里为了方便说明问题,把问题简化了,并且为了快速出结果,把时间粒度缩短为5秒钟。
  2. 计算时,要想结果准确,就不能使用Processing Time,这样如果数据从产生到被处理延迟比较大的话,最终计算的结果也会不准确。除非这个延迟可控或者可接受,则可简单的使用Processing Time,否则就必须用Event Time进行计算。

Flink提供的Watermark机制

Flink提供了3种方式来生成Watermark:

  1. 在Source中生成Watermark;
  2. 通过AssignerWithPeriodicWatermarks生成Watermark;
  3. 通过AssignerWithPunctuatedWatermarks生成Watermark;

前面介绍过了Watermark是在使用Event Time的场景下才使用的,所以给事件增加Event Time和生成Watermark是一对操作,一般都是一起使用的。方式1是直接在Flink的最源头Source那里就生成了Event Time和Watermark。方式2和方式3则是流处理中的某一步骤(可以理解为一个特殊点的算子),它的输入是流,输出还是流,只不过经过这个流之后事件就会有Event Timestamp和Watermark了,一般这一步放在Source之后,最晚也要在时间算子之前,也就是Window之前。而且他两的优先级高,如果Source中生成了Watermark,后面又使用了方式2或3,则会覆盖之前的Event Timestamp和Watermark。

下面我们分别介绍每种方式。

Watermark In Source

直接上代码,为了完整性,我把所有代码写在了一个文件里面(源文件点这里):

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignAtSourceDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }


  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
        ctx.collectWithTimestamp(e, timestampInMills);
        ctx.emitWatermark(new Watermark(timestampInMills));
      });

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }


  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 这里把消息打乱,模拟实际中的消息乱序
    // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }
}

这里自定义了一个Source,然后接了一个Window(timeWindowAll),做了一个简单的处理,最终输出。这里需要注意一个点:timeWindowAll底层其实是定义了一个TumblingWindows,至于使用Processing Time(TumblingProcessingTimeWindows),还是Event Time(TumblingEventTimeWindows)则由env.setStreamTimeCharacteristic来确定的,该选项的默认值是TimeCharacteristic.ProcessingTime,即使用Processing Time。

作为演示,修改一下上面代码,先使用Processing Time,看下结果:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
event1
event2
event4
event5
event7
event3
event6
event8
event9
Total:9

Process finished with exit code 0

可以看到,只有一个Window,其范围是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代码运行的时间,显然这样的统计结果是没有意义的,因为它体现不出业务真正的高峰期。后面我们只讨论使用Event Time的情况。

现在重新改为env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,然后运行:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

我们看下现在的输出,有两个Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}window{2020-05-24 12:00:05 - 2020-05-24 12:00:10},可以看到就是5秒钟一个Window。然后12:00:00-12:00:05这个Window里面包含了3个事件:event1,event2,event4;12:00:05-12:00:10这个Window里面包含了5个事件:event5、event7、event6、event8、event9。

从这个结果看event3丢了,其它数据都在,为什么呢?如果我说因为event3乱序了,排在了后边,你肯定会说event6也排到了event7后边,为什么event6却没有丢呢?要解释清楚这个问题还需要涉及到触发器以及窗口的原理和机制,为了保证行文的连贯性,这里我先直接给出结论:因为窗口默认的触发器实现机制是本该在一个窗口内的数据乱序了以后,只要在这个窗口结束(即被触发)之前来,那是不影响的,不认为是迟到数据,不会被丢掉;但如果这个窗口已经结束了才来,就会被丢掉了。比如event3本应该属于12:00:00-12:00:05这个窗口,当event5这条数据来的时候,这个窗口就就认为数据完整了,于是触发计算,接着就销毁了。等event3来的时候已经是12:00:05-12:00:10窗口了,所以它直接被丢掉了。也就是在时间窗口这里,对于“乱序”的定义不是要求每个到来事件的时间戳都严格升序,而是看属于这个窗口的事件能否在窗口时间范围内来,如果能来,就不算乱序,至于在这个时间范围内来的先后顺序无所谓。这个其实也是合理的。后面我计划单独写一篇介绍触发器和窗口的文章,在那篇文章中来从代码层面分析这个结论。

另外还有两个细节点要注意一下:

  • 当Source是有界数据时,当所有数据发送完毕后,系统会自动发一个值为Long.MAX_VALUE的Watermark,表示数据发送完了。
  • Window是一个左闭右开区间,比如12:00:00的数据属于12:00:00-12:00:05窗口,而12:00:05的数据属于12:00:05-12:00:10窗口。

AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks

AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks其实非常像,哪怕是用法都非常像,他两个的主要区别是Watermark的产生机制或者时机:AssignerWithPriodicWatermarks是根据一个固定的时间周期性的产生Watermark,而AssignerWithPunctuatedWatermarks则是由事件驱动,然后代码自己控制何时以何种方式产生Watermark,比如一个event就产生一个,还是几个event产生一个,或者满足什么条件时产生Watermark等,就是用户可以灵活控制。

看下代码(源文件点这里):

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignerWatermarksDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
//        .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }

  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        ctx.collect(e);
      });

      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }
  
  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 可以把消息打乱,模拟实际中的消息乱序。
    // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }
}

先分别看下AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks部分吧:

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      // 省略一些逻辑
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些逻辑
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      // 省略一些逻辑
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些逻辑
      return timestamp;
    }
  }

为了突出重点,我删掉了具体实现。可以看到这两个类都有一个extractTimestamp方法,这个方法每个Event都会调用,作用就是给Event赋一个Event Time。另外一个方法稍微有点差异,AssignerWithPeriodicWatermarks的方法叫getCurrentWatermark(),而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它们的主要区别是方法的调用机制:

  • getCurrentWatermark()没有参数,它是框架根据用户设置的固定时间周期性的调用。这个固定的时间可以通过以下方式设置:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    ExecutionConfig executionConfig = env.getConfig();
    executionConfig.setAutoWatermarkInterval(500);

    上面的代码设置每500毫秒调用一次getCurrentWatermark(),即每500毫秒产生一个Watermark。不显式的设置的话,默认值是0,但实际效果是每200ms调用一次。

  • checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有两个参数:一个是event,一个是extractTimestamp方法返回的时间戳。这个方法被调用的时间点是:每个事件来了先调用extractTimestamp,然后马上调用checkAndGetNextWatermark。在checkAndGetNextWatermark中你可以通过返回值控制是否产生新的Watermark,如果你不想返回新的Watermark,可以返回null或者一个小于等于上一个Watermark的时间戳,这样就相当于本次不返回Watermark或者返回的Watermark不是递增的被丢弃了,继续使用原来的Watermark。因为Watermark不能为null,且必须单调递增。

AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks的区别就这些,最佳实践的话我个人觉得优先考虑AssignerWithPriodicWatermarks,如果不能满足需求,再考虑AssignerWithPunctuatedWatermarks。一方面是前者简单一些,另一方面是一般没有必要每个事件就计算一个Watermark,这样会增加不是很有必要的计算量。

然后我们把最上面完整代码里面的Watermark产生器设置为AssignerWithPriodicWatermarks(打开注释的assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())),执行一下,看下效果:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:00
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
invoke getCurrentWatermark at 1590404165438 and watermark is: 2020-05-24 12:00:00
invoke extractTimestamp: 2020-05-24 12:00:01
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:04
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:05
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:07
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:03
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:06
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:08
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404165641 and watermark is: 2020-05-24 12:00:09

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
event3
Total:4
invoke getCurrentWatermark at 1590404165843 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404166051 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404166256 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404166459 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404166665 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404166871 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404167075 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404167279 and watermark is: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404167461 and watermark is: 2020-05-24 12:00:09

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

可以看到每个事件都会调用extractTimestamp,基本是200ms调用一次getCurrentWatermark,而且在Source的数据全部发送完之后,因为我加了Sleep,所以还在调用getCurrentWatermark,这就是上面说的它是固定周期调用的,而不是事件驱动调用。

然后改为AssignerWithPunctuatedWatermarks(打开注释的assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())),运行一下,输出如下:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:00
invoke getCurrentWatermark at 1590404475754 and watermark is: 2020-05-24 12:00:00
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:01
invoke getCurrentWatermark at 1590404475756 and watermark is: 2020-05-24 12:00:01
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:04
invoke getCurrentWatermark at 1590404475758 and watermark is: 2020-05-24 12:00:04
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:05
invoke getCurrentWatermark at 1590404475760 and watermark is: 2020-05-24 12:00:05
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:07
invoke getCurrentWatermark at 1590404475763 and watermark is: 2020-05-24 12:00:07
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:03
invoke getCurrentWatermark at 1590404475765 and watermark is: 2020-05-24 12:00:03
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:06
invoke getCurrentWatermark at 1590404475768 and watermark is: 2020-05-24 12:00:06
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:08
invoke getCurrentWatermark at 1590404475770 and watermark is: 2020-05-24 12:00:08
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
invoke extractTimestamp: 2020-05-24 12:00:09
invoke getCurrentWatermark at 1590404475772 and watermark is: 2020-05-24 12:00:09

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

可以看到,getCurrentWatermark是事件驱动调用的,每个事件来了先调用extractTimestamp,然后紧接着就调用checkAndGetNextWatermark。我们这里的实现是每个Event都产生一个新的Watermark。

迟到数据

从上面的部分看到event3因为迟到被默默的丢掉了,现实中数据是重要资产,肯定是不能随便丢弃的。Flink提供了两种解决方案:

  1. 允许一定的延迟。这个延迟可以在两个地方设置:第一种是可以在上面的AssignerWithXXXWatermarks方法里面给计算出的时间戳减去一个时间,这个时间就是你允许延迟的时间。第二种就是在时间窗口那里可以通过allowedLateness来设置一个允许的延迟时间,之前流处理随谈一文中已经介绍过了,这里就不赘述了。但允许一定延迟的方式只能治标,不能治本。我们只能根据实际情况允许一定限度的延迟,但总归是有个限度的,原因主要有两个:1)延迟太高会丧失实时性,如果你的场景对实时性要求比较高,那就无法设置太大的延迟。2)延迟实际是延长了窗口的生命周期,所以资源消耗会增加。
  2. 在Window那里通过sideOutputLateData将迟到的数据以流的形式旁路出去。之前流处理随谈一文中也已经介绍过了。这个是治本的手段,它没有时间的限制,如果有迟到数据,就会发送到这个单独的流里面去,然后可以为这个流单独设置处理方式。

这两种方式有利有弊,各有不同的适用场景,根据自己的业务灵活选择即可。

最后总结一下:只要你理解Watermark的本质,其实它就是一个很简单的东西,使用起来也很简单。只是框架层面要实现会有一定难度,但这个对使用者而言是不可见的。