Flink的Watermark细节介绍一文中提到了Watermark其实主要就是解决Event Time + Window中的数据完整性问题的,本文作为那篇文章的补充,再介绍一下Window这个概念。关于这部分,我觉得官方文档已经介绍的非常详细了,如果你是Flink使用者,强烈建议好好读几遍。我这里就主要概括性的介绍一下,作为前面文章的补充,同时解决前文遗留的一个问题。

What & Why

什么是Window?为什么需要Window?流处理里面一般都是事件驱动的(Spark是微批),即每个事件来就会触发算子(Operator)进行计算,典型的比如map、flatmap、filter等,这些都是无状态的计算。有些时候需要在流处理里面进行有状态的计算,比如电商场景分析1分钟的访问人数、购买人数各是多少等,这些计算要缓存数据(即是有状态的计算),需要通过Window来做,这就是Window要解决的场景:支持流处理中有状态的计算。另外,Window本身也是一个算子,只不过是一个有状态的算子。

鉴于官网的文档非常详尽,我不重复造轮子了,我从另外一个角度来介绍一个窗口:窗口的生命周期,即从窗口的创建到最终销毁是如何流转的。这部分一些细节官方文档说的不是特别明确,我通过这篇文章做一下补充。如果你对窗口还没有任何概念,建议先阅读官方文档,对窗口有一些基础了解之后,再来阅读本文。

为了方便讲解,我把Flink的Watermark细节介绍一文中构造的例子稍微进行了一点改造,在原始事件中加入了事件类型,共有三种:pv(浏览)、cart(加购物车)、buy(购买),这样产生的原始事件就是下面这样了(注意,事件依旧是乱序的):

event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

然后我们要做的事情就是分析每5秒钟各种行为的个数。这里的数据只有12:00:00.000-12:00:05.00012:00:05.000-12:00:10.000两个5秒钟窗口的数据,我们也就用这两个窗口来分析。

Window流转

这里我先截了一张官方文档的图:

tm0c24.png

可以看到Window大的分为两类:Keyed WindowsNon-Keyed Windows。你可以把Window底层想象成一个容器,Keyed Windows就是一个Map,Non-Keyed Windows就是一个List。实际中用的最多的就是Keyed Windows了,主要有两个原因:

  1. 业务上,数据处理一般都是要哈希分区(即上面的keyBy)的。比如用户行为分析,一般会根据用户ID去哈希或者根据行为类型哈希。
  2. Non-Keyed Windows不能并行计算,也就是只能由一个线程去计算,在flink里面这个算子的parallelism只能是1。

不过,Non-Keyed Windows相比于Keyed Windows,仅是少了一个keyBy的操作,后面的流程逻辑是完全一样的,所以对于学习Window的机制,影响不是很大。而且上一篇文章中的示例其实就是Non-Keyed Windows,所以这篇文章主要以Keyed Windows为例进行介绍。

可以看到窗口基本上包含以下几个部分:

  • keyBy:即哈希,只有Keyed Windows才有,它决定如何产生Window。比如上篇文章中我们没有使用keyBy,对于tumbling windows,一个时间范围只会产生一个Window(12:00:00.000-12:00:05.000一个,12:00:05.000-12:00:10.000一个)了。如果我们使用了keyBy,那一个时间范围内有几个不同的key,就会有几个Window。
  • Window Assigner:这里主要定义Window的类型,Flink内置了一些常用的window:tumbling windowssliding windowssession windowsglobal windows,我们也可以通过继承WindowAssigner类来实现自定义的窗口。内置的几个windows都很简单,这里就不展开说了。需要注意的是global windows,它不是一个基于时间的window,后面我们会再次提到。
  • Trigger:即触发器,它的作用是决定窗口如何触发。前面说了窗口就是一个“容器”,用来缓存事件,所谓窗口触发就是什么时候开始对缓存的数据进行计算,计算规则通过下面的process/reduce/aggregate/fold/apply来定义。
  • Evictor:驱逐,待会介绍。
  • AllowedLateness:以前文章已经见过了,使用Event Time时才有意义,指定允许数据迟到的时间,默认为0,即不允许迟到。
  • SideOutputLateData:以前文章已经见过了,使用Event Time时才有意义,将迟到的数据旁路到单独的一个流。默认不旁路,迟到数据直接丢掉。
  • 窗口处理函数:process/reduce/aggregate/fold/apply,前面提到了,定义如何计算窗口内的数据。
  • GetSideOutput:用于旁路一个流,这个不是window特有的。

下面我画了一个流转图:

window-flow.png

12:00:00.000-12:00:05.000这个时间段的窗口为例,源源不断的事件流来了以后,先经过keyBy生成若干个window(图中有key=pv和key=buy两个window),然后再流经trigger,判断事件是先缓存还是触发窗口计算。如果缓存,则直接先放在窗口里面。如果是触发计算,且定义了before evictor,则把窗口缓存的所有数据交给evictor,evictor处理完之后交给定义的window function。如果window function之后还定义了after evictor,则数据再交给after evictor,最后继续发往下游。

这个流程有2个注意点:

  • window function有两大类:一类是process/apply,一类是reduce/aggregate/fold. process(apply已废弃)是数据先全部缓存在窗口中,等到触发的时候,将所有数据一次交给process方法;reduce/aggregate/fold则是来一个事件就计算一次。比如累加1-100整数的操作,process类的是先缓存,等100个数全到了,一次交给process做计算;而reduce/aggregate/fold类的则是来一个就计算一个,然后只保留计算结果。显然后者更高效一些,且不用缓存太多的数据。实践中,两类都能满足需求的时候,推荐使用后者。
  • 可以在trigger触发后,在将缓存数据交给window function之前这个之间定义一个evictor(图中的before evictor),来对数据做一些处理,也可以在window function计算完发往下游之前定义一个evictor(图中的after evictor),两个都是可选的。但需要注意的是,一旦定义了evictor,就只能使用process,不能使用reduce/aggregate/fold了,因为evictor需要一次拿到窗口内缓存的全部数据。

触发器Trigger

上面讲了,trigger可以决定窗口直接缓存数据,还是触发计算,我们看下具体是如何做到的。Flink中,Trigger抽象类定义了一些方法,其中以下几个是比较重要的:

  • onElement:每个事件来都会调用一次该方法。
  • onEventTime:当根据触发器上下文设定的Event Time的定时器触发时,就会调用该方法(有点拗口,原话是:Called when an event-time timer that was set using the trigger context fires)。比如对于12:00:00.000-12:00:05.000这个窗口,基于Event Time的默认触发器的实现是当事件时间等于12:00:05.000前1毫秒时(即12:00:04.999),定时器就会触发。后面我们会看这部分代码。
  • onProcessingTime:当根据触发器上下文设定的Processing Time的定时器时触发时,就会调用该方法。
  • onMerge:当两个窗口需要合并时,就会调用该方法。这个只有某些窗口才会使用到。
  • clear:当需要清除窗口内容的时候,就会调用该方法。

其中onElement、onEventTime、onProcessingTime决定窗口行为,它们的返回值都是一个TriggerResult,这是一个枚举类型,目前有这么几个枚举值:

  • CONTINUE:什么也不做,仅把事件加到当前窗口中。
  • FIRE:表示触发窗口,即开始在窗口上执行定义的window function。计算完之后并不会清除窗口内的数据。
  • PURGE:清除窗口内的所有数据(包含元数据),但不会触发窗口计算。这个操作相当于删除了当前窗口
  • FIRE_AND_PURGE:触发窗口计算,计算完之后删除窗口。

Trigger对于窗口的生命周期至关重要,Flink给每个内置窗口都定义了默认的触发器,我们也可以自定义自己的触发器。

文章后面我们在解答Flink的Watermark细节介绍一文中遗留的问题时会分析内置的EventTimeTrigger,到时候再结合代码看一下上面介绍的这部分内容。接下来我们再介绍一个比较重要的知识点。

迟到数据的处理

现在已经对窗口有了一个比较细致的了解,我们再来讨论下关于迟到数据的处理。默认迟到数据是会被直接丢弃的。以12:00:00.000-12:00:05.000这个窗口为例,它会在12:00:00.000-12:00:05.000这个区间的第一个事件到来时被创建,然后默认(在EventTimeTrigger中定义)在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,并在watermark值超过12:00:05.000之后将窗口移除。但如果我们通过allowedLateness设置允许1秒钟的延迟,会发生什么?

以前文章说过,允许延迟实际是延长了窗口的生命周期,允许1秒延迟相当于把窗口的生命周期变成了12:00:00.000-12:00:06.000。但上面介绍的不允许延迟的情况下的流程基本不变,依旧会按照上面的逻辑在时间范围内第一个事件到来时创建窗口,在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,但只有当watermark值超过12:00:06.000才会移除窗口。也就是如果属于12:00:00.000-12:00:05.000窗口的数据迟到了(即在12:00:05.000之前没能够来 ),但要是能够在12:00:06.000之前来,就会重新触发一次窗口计算,且来一个事件就会触发一次窗口计算。这里可以看到,允许数据迟到之后,迟到范围内到的数据依旧属于原来的窗口,而不是下一个窗口。另外就是每次触发窗口计算都会往下游发送一个数据,这样多次触发,就会往下游发送多次数据。所以允许迟到数据一方面如前文所说会影响处理的实时性,增加资源消耗,另一方面也可能会产生重复数据,需要下游能够正确处理这种情况。

最后关于窗口的生命周期需要注意的是,基于时间的窗口生命周期完结之后,Flink会负责移除,但像内置的Global Windows不属于时间窗口,Flink不会去移除这种窗口,需要用户自己实现。

窗口的个数

基于时间的窗口,一个定义的时间范围内会产生多少个窗口?答案是:1到N个,看情况(it depends~~)。看哪些情况呢:

  1. 看窗口类型。Flink内置了好几种窗口,有些窗口可能会重叠(即overlap),一个事件可能会重复分到多个窗口,也就是一个事件可能会触发创建或归属于多个窗口,典型的比如sliding windows,而Session Windows更是和gap有关系,还可能是动态gap。所以可能会产生多个窗口实例。
  2. 如果是Keyed Windows,还要看key的个数。有多个少不同的key,就会有多少个窗口。

窗口数据缓存在哪里

Flink的Window底层是State,目前State支持三种后端:

  • MemoryStateBackend:默认的后端。很简单,就是把数据缓存在内存里面,这个显然只适用于平时的开发调试或者生产上状态比较小的场景。不过现在有个FIP-50,是阿里提出的优化,提供内存不够用时再写磁盘的功能,我之前专门有篇文章分析过:Flink FLIP-50: Spill-able Heap Keyed State Backend。最初计划随着1.9版本发布,后来推迟到1.10。结果1.10也没加进去。现在1.11马上也要发布了,这个功能也没做进去,但单独提供了一个包,如果有需要的话,可以从这里下载
  • FsStateBackend:状态数据存储在TaskManager的内存里面,Checkpoint的时候会快照到文件系统上(支持HDFS等远程文件系统),适用于存储大一些的状态。
  • RocksDBStateBackend:状态数据存储在TaskManager节点的RockDB上(一个C++写的嵌入式KV数据库),Checkpoint的时候会快照到文件系统上(支持HDFS等远程文件系统)。可见整个状态都是在磁盘上存储的,所以它可以支撑超大量级的状态。

不管需用哪种状态,只需要在env那里配置一下,对于窗口以及State的使用都是透明的。DataFlow模型能做到流批统一的一个原因就是它能支撑超大量级的状态。可以看到状态数据最终可以存储在HDFS等分布式文件系统上,这样理论上MapReduce这种离线计算能搞定的,Flink都能做,且机制更加灵活。

一个完整代码示例

讲了这么多理论,看个具体的例子吧(源文件见这里),场景还是上面说的分析每5秒钟各种行为的个数:

package com.niyanchun.window;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * A window demo.
 *
 * @author NiYanchun
 **/
public class WindowDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .keyBy(f -> f.getString("action"))
        .timeWindow(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }


  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOrderedEvents().forEach(e -> {
        System.out.println(e);
        long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
        ctx.collectWithTimestamp(e, timestampInMills);
        ctx.emitWatermark(new Watermark(timestampInMills));
      });
      System.out.println();

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }


  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOrderedEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2").fluentPut("action", "cart")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6").fluentPut("action", "cart")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 这里把消息打乱,模拟实际中的消息乱序
    // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessWindowFunction<JSONObject, Object, String, TimeWindow> {
    @Override
    public void process(String s, Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      int count = 0;
      for (JSONObject ignored : elements) {
        count++;
      }
      System.out.println(sdf.format(window.getStart()) + "-" + sdf.format(window.getEnd()) + ": " + count + " " + s);
    }
  }

  public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private CustomEventTimeTrigger() {
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

      System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
          "; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
          "; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
      if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
      } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
      }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
          "; time:" + sdf.format(time));
      return time == window.maxTimestamp() ?
          TriggerResult.FIRE :
          TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
      ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
      // only register a timer if the watermark is not yet past the end of the merged window
      // this is in line with the logic in onElement(). If the watermark is past the end of
      // the window onElement() will fire and setting a timer here would fire the window twice.
      long windowMaxTimestamp = window.maxTimestamp();
      if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
        ctx.registerEventTimeTimer(windowMaxTimestamp);
      }
    }

    @Override
    public String toString() {
      return "EventTimeTrigger()";
    }

    /**
     * Creates an event-time trigger that fires once the watermark passes the end of the window.
     *
     * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static CustomEventTimeTrigger create() {
      return new CustomEventTimeTrigger();
    }
  }

  public static class CustomAggregation implements AggregateFunction<JSONObject, Object, Object> {

    @Override
    public Object createAccumulator() {
      return null;
    }

    @Override
    public Object add(JSONObject value, Object accumulator) {
      return null;
    }

    @Override
    public Object getResult(Object accumulator) {
      return null;
    }

    @Override
    public Object merge(Object a, Object b) {
      return null;
    }
  }
}

代码内容不说了,和上篇文章的类似,只是把原来基于Non-Keyed Windows的timeWindowAll换成了基于Keyed Windows的timeWindow,所以多了一个keyBy的操作。代码执行结果如下:

event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

2020-05-24 12:00:00-2020-05-24 12:00:05: 1 cart
2020-05-24 12:00:00-2020-05-24 12:00:05: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 buy
2020-05-24 12:00:05-2020-05-24 12:00:10: 1 cart

Process finished with exit code 0

前面是Source那里发出的乱序事件,后面是窗口计算的结果。简要分析一下:event3乱序了,被丢掉了,所以12:00:00-12:00:05这个window只收到event1、event2、event4三个时间,2个pv事件,1个加购物车事件,根据前面介绍的keyBy操作会产生2个窗口,所以打印了两个12:00:00-12:00:05,分别对应于cart和pv(最后面列就是window的key)。12:00:05-12:00:10这个时间段类似。

至此,理论篇也差不多了,这部分主要是对我认为官方文档没有太展开的一些细节部分进行了补充讲解。下一部分是以Flink内置的EventTimeTrigger触发器实现来介绍一下触发器的细节,顺便回答一下Flink的Watermark细节介绍文章中遗留的一个问题:为什么有的乱序丢了,有的没丢?

为什么有的乱序丢了,有的没丢?

Flink的Watermark细节介绍一文中我们使用的是基于Event Time的Tumbling Window,它使用的默认触发器是EventTimeTrigger,这个是Flink内置的,为了加一些打印语句,我完全Copy了一份,重新定义了一个自己的触发器CustomEventTimeTrigger,完整程序代码前面文章已经有了,这里为了方便看触发器,仅贴一下触发器部分的代码(完整代码见这里):

public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
  private static final long serialVersionUID = 1L;

  private CustomEventTimeTrigger() {
  }

  @Override
  public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
        "; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
        "; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE;
    } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
    }
  }

  @Override
  public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
        "; time:" + sdf.format(time));
    return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
        TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    ctx.deleteEventTimeTimer(window.maxTimestamp());
  }

  @Override
  public boolean canMerge() {
    return true;
  }

  @Override
  public void onMerge(TimeWindow window,
                      OnMergeContext ctx) {
    // only register a timer if the watermark is not yet past the end of the merged window
    // this is in line with the logic in onElement(). If the watermark is past the end of
    // the window onElement() will fire and setting a timer here would fire the window twice.
    long windowMaxTimestamp = window.maxTimestamp();
    if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
      ctx.registerEventTimeTimer(windowMaxTimestamp);
    }
  }

  @Override
  public String toString() {
    return "EventTimeTrigger()";
  }

  /**
   * Creates an event-time trigger that fires once the watermark passes the end of the window.
   *
   * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
   * trigger window evaluation with just this one element.
   */
  public static CustomEventTimeTrigger create() {
    return new CustomEventTimeTrigger();
  }
}

上面代码仅是在原来的EventTimeTrigger上面加了一些输出,逻辑完全没有变更。可以看到这个触发器主要实现了onElementonEventTime两个方法,也就是在使用Event Time的时候,窗口触发主要通过这两个方法实现。我们先看下程序输出:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

onElement -- event: event1; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():292269055-12-03 00:47:04.192
onElement -- event: event2; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:00.000
onElement -- event: event4; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:01.000
onElement -- event: event5; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:04.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:04.999; time:2020-05-24 12:00:04.999

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3
onElement -- event: event7; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:05.000
onElement -- event: event6; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event8; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event9; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:08.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:09.999; time:2020-05-24 12:00:09.999

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

这里解释几个时间:

  • 之前说过一个时间窗口的窗口范围是[start_time, end_time),时间格式都是从1970-01-01T00:00:00Z至今的毫秒数。而这里的window.maxTimestamp()就是这个end_time减1(因为是左闭右开的).
  • onElement方法参数中的long timestamp是指当前事件(即T element)到达的时间。
  • onEventTime方法参数中的long time就是前文说的定时器触发的时间,实际值和window.maxTimestamp()一样,也就是这两个值是时间窗口生命周期结束的时间值。

根据上面的输出也可以看出来,然后结合这里代码实现就得出了前文给出的结论:对于窗口而言,它只看属于这个窗口的数据是否在[start_time, end_time)这个时间范围内来,如果来了,就没迟到,就是“有序的”,所以它保证的是当前时间范围窗口和下一个时间范围窗口的有序,而不是一个时间范围窗口内时间的有序性,而且Flink的确并不保证一个窗口内的事件顺序(原文是:Flink provides no guarantees about the order of the elements within a window.)。也就是一个窗口内event1是比event2早到,但传给窗口计算函数(process那种)的时候,并不保证event1一定在event2之前。

本文到这里也就结束了,只有理解了窗口的机制,才能更好的理解watermark是为了解决什么问题。但请记住,只有当使用基于Event Time的窗口时,Watermark才有意义。

文章目录