NYC's Blog - Flink http://niyanchun.com/tag/flink/ zh-CN Sat, 03 Apr 2021 11:23:00 +0800 Sat, 03 Apr 2021 11:23:00 +0800 Flink快速了解(7)——Async I/O http://niyanchun.com/flink-quick-learning-7-async-io.html http://niyanchun.com/flink-quick-learning-7-async-io.html Sat, 03 Apr 2021 11:23:00 +0800 NYC 上篇介绍了常见的算子,本文介绍另外一个重要的算子:Async I/O,即异步IO。它是流中频繁访问外部数据的利器,特别是当访问比较耗时的时候。

产生背景

先考虑一个实际中挺常见的场景:一个流处理程序中对于每个事件都要查一次外部的维表(比如HBase,这里暂不考虑缓存机制)做关联,那在Flink中如何实现呢?典型的做法就是增加一个map/flatmap,在里面做一下查询关联。这样功能没问题,但这个查询很容易会变成系统的瓶颈,特别是当外部查询比较耗时的时候。好在Flink里面有一个异步IO算子,可以很好的解决这个问题。

异步IO是阿里巴巴贡献给Flink非常重要的一个特性,在Flink 1.2版本中正式发布,对应的提案是FLIP-12: Asynchronous I/O Design and Implementation。这个特性解决的问题和所有其它的异步IO、IO多路复用技术是一致的:IO往往比较耗时,通过异步IO可以提高系统的吞吐量。这个Async I/O特性只不过是流处理场景里面的异步IO而已,原理没有什么特殊之处,看下官方的一个图:

async-io

左侧是同步IO,可以看到大部分时间用来等待了;右侧是异步IO,提升效果很明显。当然,通过提高任务的并行度也能一定程度的缓解同步IO的问题,这种方式有点类似于网络编程早期的per-connection-per-thread模型,但这种模式不够彻底,而且提高并行度的代价比较高。道理都懂,就不再赘述了,下面看怎么用。

如何使用

使用概述

回想一下网络编程中的异步IO(这里指的是IO多路复用技术),必须要内核支持select、poll/epoll才可以。Flink的异步IO也类似,需要访问外部数据的客户端支持异步请求才可以。如果不支持的话,也可以通过线程池技术模拟异步请求,当然效果上会差一些,但一般还是比同步IO强的。具体到编码层面,分3个步骤:

  1. 实现AsyncFunction接口,这个接口的作用是分发请求。Flink内置了一个实现类RichAsyncFunction,一般我们继承这个类即可。
  2. AsyncFunction#asyncInvoke(...)中实现一个回调函数,在回调函数中获取异步执行的结果,并且传递给ResultFuture
  3. 将异步操作应用到某个流上面。

下面是官方给的一段示例代码:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 * 第1步:实现`AsyncFunction`接口
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 第2步:实现一个回调函数
        CompletableFuture.supplyAsync(new Supplier<String>() {
            
            @Override
            public String get() {
                try {
                    // 获取异步执行的结果
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            // 并且传递给`ResultFuture`
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
// 第3步:将异步操作应用到某个流上面。
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

代码的执行逻辑是这样的:

  • 流里面的每一个事件都会调用asyncInvoke,然后我们在这个方法里面实现访问外部数据的异步请求,比如上面代码中的client.query(key),然后得到一个异步对象(比如Future对象)。
  • 给这个异步对象定义一个回调函数,比如上面代码中的CompletableFuture.supplyAsync(...).thenAccept(...),然后在该回调函数获取异步任务的执行结果,并且交给ResultFuture,这一步非常重要,待会还会再提到。

为了防止异步IO无限堆积或者某些请求挂死,一般都会提供超时设置和最高的异步并发数,Flink的异步IO也不例外。上面代码中最后一行的unorderedWait参数中的1000就是异步任务的超时时间,100就是同时允许的并发请求数,称为Capacity。那超时后如何处理呢?看下AsyncFunction接口:

public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     *     trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
     * exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
    }
}

除了定义了一个asyncInvoke方法,还定义了一个timeout,并且提供了一个默认实现。当异步任务超时后,就会回调该方法,默认是抛出异常,即整个任务失败。一般我们需要重新实现该方法。那如果并发数达到了限制会怎么样?答案是会阻塞,产生反压。

另外需要注意的一个地方就是事件的有序性。因为是异步请求,所以返回时间是不确定的,Flink的异步IO既支持保证事件的原始顺序,也可以不保证。异步IO的内部实现是依靠队列的,当需要保证有序的时候,实际是将先得到结果但排在后面的事件缓存起来,所以必然会增加一些延迟和资源占用。而无序可以获得更好的实时性和吞吐,但需要注意的是当使用Event Time的时候,这个无序并不是全局无序,而是在两个watermark之间的事件无序,整体还是要遵从watermark机制的。无序使用AsyncDataStream.unorderedWait(...) ,有序使用AsyncDataStream.orderedWait(...)

同步模式&线程池模式&异步模式对比demo

这部分主要是给了两个示例,演示如何使用Flink异步IO。

第一个例子中包含了3种场景:

  • flatmap中实现外部访问的同步IO
  • 使用线程池实现的异步IO,且不保证顺序
  • 使用线程池实现的异步IO,且保证顺序

其中,为了体现异步IO的优势,并没有真正访问数据库,而是使用了一个sleep操作,模拟比较耗时的IO操作。

public void update(Tuple2<String, Integer> tuple2) throws SQLException {
    // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
    //ps.setLong(1, balance);
    //ps.setLong(2, 1);
    //ps.execute();
    try {
    Thread.sleep(tuple2.f1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
}

注意:虽然没有真正访问数据库,但整个代码都是按照模拟真实场景写的,只是把里面执行数据库操作的换成了sleep,所以运行时还是会连接数据库、创建连接池。如果要自己运行代码,请修改代码中的数据库连接地址。另外,3个场景使用的数据源是同一个,而sleep的时间也是在数据源中定义好的,所以它们的IO耗时是相同的:

static List<Tuple2<String, Integer>> dataset() {
List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
    // f0: 元素名称  f1: sleep的时间,模拟该元素需要的IO耗时
    dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
}
return dataset;
}

下面是完整代码:

public class ThreadPoolAsyncDemoFakeQuery {

  public static void main(String[] args) throws Exception {
    // --------------------------------- Async IO + Unordered -------------------------
    StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncUnordered.setParallelism(1);
    envAsyncUnordered.setBufferTimeout(0);

    DataStream<Tuple2<String, Integer>> source = envAsyncUnordered.fromCollection(dataset());
    DataStream<String> result = AsyncDataStream.unorderedWait(source, new ThreadPoolAsyncMysqlRequest(),
        10, TimeUnit.SECONDS, 10);
    result.print();

    System.out.println("Async + UnorderedWait:");
    envAsyncUnordered.execute("unorderedWait");

    // --------------------------------- Async IO + Ordered -------------------------
    StreamExecutionEnvironment envAsyncOrdered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncOrdered.setParallelism(1);
    envAsyncOrdered.setBufferTimeout(0);

    AsyncDataStream.orderedWait(envAsyncOrdered.fromCollection(dataset()),
        new ThreadPoolAsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10)
        .print();
    System.out.println("Async + OrderedWait");
    envAsyncOrdered.execute("orderedWait");

    // --------------------------------- Sync IO -------------------------
    StreamExecutionEnvironment envSync = StreamExecutionEnvironment.getExecutionEnvironment();
    envSync.setParallelism(1);
    envSync.setBufferTimeout(0);

    envSync.fromCollection(dataset())
        .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
          private transient MysqlClient client;

          @Override
          public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            client = new MysqlClient();
          }

          @Override
          public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            try {
              client.update(value);
              out.collect(value + ": " + System.currentTimeMillis());
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        }).print();
    System.out.println("Sync IO:");
    envSync.execute("Sync IO");
  }

  static List<Tuple2<String, Integer>> dataset() {
    List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
    for (int i = 0; i < 10; i++) {
      // f0: 元素名称  f1: sleep的时间
      dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
    }
    return dataset;
  }

  static class ThreadPoolAsyncMysqlRequest extends RichAsyncFunction<Tuple2<String, Integer>, String> {
    private transient ExecutorService executor;
    private transient MysqlClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      executor = Executors.newFixedThreadPool(30);
      client = new MysqlClient();
    }

    @Override
    public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) {
      executor.submit(() -> {
        long current = System.currentTimeMillis();
        String output = input + ":" + current;

        try {
          client.update(input);
          resultFuture.complete(Collections.singleton(output));
        } catch (SQLException e) {
          e.printStackTrace();
          resultFuture.complete(Collections.singleton(input + ": " + e.getMessage()));
        }
      });
    }

    @Override
    public void timeout(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception {
      System.out.printf("%s timeout\n", input);
      resultFuture.complete(Collections.singleton(input + ": time out"));
    }

    @Override
    public void close() throws Exception {
      client.close();
      super.close();
    }
  }

  static class MysqlClient {
    static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    static final String DB_URL = "jdbc:mysql://10.9.1.18:3306/test?useSSL=false";

    private Connection conn;
    private PreparedStatement ps;

    public MysqlClient() throws Exception {
      Class.forName(JDBC_DRIVER);
      conn = DriverManager.getConnection(DB_URL, "root", "root123.");
      ps = conn.prepareStatement("UPDATE account SET balance = ? WHERE id = ?;");
    }

    public void update(Tuple2<String, Integer> tuple2) throws SQLException {
      // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
      //ps.setLong(1, balance);
      //ps.setLong(2, 1);
      //ps.execute();
      try {
        Thread.sleep(tuple2.f1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    public void close() {
      try {
        if (conn != null) {
          conn.close();
        }
        if (ps != null) {
          ps.close();
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

代码输出如下:

Async + UnorderedWait:
(e0,1):1617109474293
(e3,1):1617109474294
(e1,2):1617109474293
(e4,2):1617109474294
(e2,3):1617109474293
(e6,1):1617109474295
(e9,1):1617109474297
(e7,2):1617109474296
(e5,3):1617109474295
(e8,3):1617109474297
Async + OrderedWait
(e0,1):1617109474891
(e1,2):1617109474891
(e2,3):1617109474891
(e3,1):1617109474891
(e4,2):1617109474892
(e5,3):1617109474892
(e6,1):1617109474893
(e7,2):1617109474893
(e8,3):1617109474893
(e9,1):1617109474893
Sync IO:
(e0,1): 1617109475257
(e1,2): 1617109475260
(e2,3): 1617109475264
(e3,1): 1617109475265
(e4,2): 1617109475268
(e5,3): 1617109475272
(e6,1): 1617109475274
(e7,2): 1617109475277
(e8,3): 1617109475281
(e9,1): 1617109475283

可以看到:

  • Async + UnorderedWait:耗时4ms,且输出的事件是乱序的
  • Async + OrderedWait:耗时2ms,输出事件是保持原有顺序的
  • Sync IO:耗时26ms,输出的事件也是保持原有顺序的

该程序运行多次,输出不太稳定,但Async方式都是远远小于Sync的。因为数据量比较小,而且耗时都比较平均,所以无序的优势不明显,有时甚至还会比有序高。这个例子并不是一个严格的性能测试,但却可以用来体现Async相比于Sync的明显优势。

第二个例子则是使用真实的异步请求客户端执行真正的操作。这里使用了jasync-sql用于异步访问MySQL。完整代码如下:

public class AsyncDemo {

  public static void main(String[] args) throws Exception {
    final long repeat = 100L;

    // --------------------------------- Async IO + Unordered -------------------------
    StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncUnordered.setParallelism(1);
    envAsyncUnordered.setBufferTimeout(0);

    DataStream<Long> source = envAsyncUnordered.fromSequence(1, repeat);
    DataStream<String> result = AsyncDataStream.unorderedWait(source, new AsyncMysqlRequest(),
        10, TimeUnit.SECONDS, 10);
    result.print();

    System.out.println("Async + UnorderedWait:");
    envAsyncUnordered.execute("unorderedWait");
  }


  static class AsyncMysqlRequest extends RichAsyncFunction<Long, String> {
    Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      conn = MySQLConnectionBuilder.createConnectionPool("jdbc:mysql://{your-ip}:3306/test?user=root&password={your-password}");
      conn.connect().get();
    }

    @Override
    public void asyncInvoke(Long input, ResultFuture<String> resultFuture) throws Exception {
      List<Long> balance = new ArrayList<>(1);
      balance.add(input);
      CompletableFuture<QueryResult> future =
          conn.sendPreparedStatement("SELECT * FROM account WHERE balance = ?;", balance);
      // 如果执行成功
      future.thenAccept(queryResult -> {
        resultFuture.complete(Collections.singleton(balance + ":" + System.currentTimeMillis()));
      });
      // 如果执行异常
      future.exceptionally(e -> {
        e.printStackTrace();
        resultFuture.complete(Collections.singleton(balance + e.getMessage() + ":" + System.currentTimeMillis()));
        return null;
      });
    }

    @Override
    public void timeout(Long input, ResultFuture<String> resultFuture) throws Exception {
      System.out.printf("%d timeout\n", input);
      resultFuture.complete(Collections.singleton(input + ": time out"));
    }
  }
}

使用注意点

  1. 不要在AsyncFunction#asyncInvoke(...)内部执行比较耗时的操作,比如同步等待异步请求的结果(应该放到回调中执行)。因为一个流中每个Partition只有一个AsyncFunction实例,一个实例里面的数据是顺序调用asyncInvoke的,如果在里面执行耗时操作,那异步效果将大打折扣,如果同步等待异步的结果,那其实就退化成同步IO了。
  2. 异步请求超时回调默认是抛出异常,这样会导致整个Flink Job退出。这一般不是我们想要的,所以大多数时候都需要覆写timeout方法。
  3. 在自定义的回调函数里面一定要使用ResultFuture#completeResultFuture#completeExceptionally将执行结果传递给ResultFuture,否则异步请求会一直堆积在队列里面。当队列满了以后,整个任务流就卡主了。
  4. Flink异步IO也是支持Checkpoint的,所以故障后可以恢复,提供Exactly-Once语义保证。

原理浅析

AsyncWaitOperator

这里结合目前最新的Flink 1.12.1版本从源码角度简单分析一下Async I/O的实现,这里引用了Jark博客中的几张图(见文末引用部分)。

async-internal

如图,Flink Async I/O实质是通过队列的方式实现了一个异步的生产者-消费者模型。其中队列就是StreamElementQueue;生产者就是就是我们的主流,通过AsyncDataStream.xxxWait将数据放入队列;消费者就是图中的Emitter。调用流程就是流中的一个事件到了以后,先放入队列中,同时调用通过AsyncFunction#asyncInvoke(...)定义的异步逻辑。当异步逻辑执行完成后,就会将队列中该事件标记为已完成,然后Emitter将该事件的结果发送到下游继续执行。实现Async I/O最主要的类是AsyncWaitOperator,我把代码做了精简,保留了体现上面步骤的部分代码,具体见代码中的注释。

@Internal
public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

    /** Capacity of the stream element queue. */
    private final int capacity;

    /** Output mode for this operator. */
    private final AsyncDataStream.OutputMode outputMode;

    /** Timeout for the async collectors. */
    private final long timeout;

    /** Queue, into which to store the currently in-flight stream elements. */
    // 队列
    private transient StreamElementQueue<OUT> queue;

  
    @Override
    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue<>(capacity);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue<>(capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }

        this.timestampedCollector = new TimestampedCollector<>(output);
    }


    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // add element first to the queue
        // 将事件放入队列
        final ResultFuture<OUT> entry = addToWorkQueue(element);

        final ResultHandler resultHandler = new ResultHandler(element, entry);

        // register a timeout for the entry if timeout is configured
        // 设置异步回调的超时处理
        if (timeout > 0L) {
            final long timeoutTimestamp =
                    timeout + getProcessingTimeService().getCurrentProcessingTime();

            final ScheduledFuture<?> timeoutTimer =
                    getProcessingTimeService()
                            .registerTimer(
                                    timeoutTimestamp,
                                    timestamp ->
                                            userFunction.timeout(
                                                    element.getValue(), resultHandler));

            resultHandler.setTimeoutTimer(timeoutTimer);
        }

        // 调用异步逻辑
        userFunction.asyncInvoke(element.getValue(), resultHandler);
    }


    /**
     * Add the given stream element to the operator's stream element queue. This operation blocks
     * until the element has been added.
     *
     * <p>Between two insertion attempts, this method yields the execution to the mailbox, such that
     * events as well as asynchronous results can be processed.
     *
     * @param streamElement to add to the operator's queue
     * @throws InterruptedException if the current thread has been interrupted while yielding to
     *     mailbox
     * @return a handle that allows to set the result of the async computation for the given
     *     element.
     */
    private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
            throws InterruptedException {

        Optional<ResultFuture<OUT>> queueEntry;
        while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
            mailboxExecutor.yield();
        }

        return queueEntry.get();
    }


    /**
     * Outputs one completed element. Watermarks are always completed if it's their turn to be
     * processed.
     *
     * <p>This method will be called from {@link #processWatermark(Watermark)} and from a mail
     * processing the result of an async function call.
     */
    private void outputCompletedElement() {
        if (queue.hasCompletedElements()) {
            // emit only one element to not block the mailbox thread unnecessarily
            queue.emitCompletedElement(timestampedCollector);
        }
    }

}

可以看到processElement方法中先将事件放入队列,然后注册超时定时器,最后再调用异步处理逻辑。其中addToWorkQueue中使用了一个while循环往队列里面放入事件,也就是如果队列满了,这个插入操作会一直阻塞在这里。也就是前文提到的:如果并发的异步请求数达到了Capacity的值,流就会阻塞产生反压。然后,在outputCompletedElement方法中实现了将完成的异步请求事件发送到下游的操作。

前文提到了Flink Async I/O提供了有序和无序两种保证,这部分功能是通过不同的队列实现的。上面的代码中也能看到有序的时候使用的是OrderedStreamElementQueue,无序的时候使用的是UnorderedStreamElementQueue,队列的大小就是最大并发的异步请求数Capacity。这两个队列都是图中StreamElementQueue接口的具体实现,先看下这个接口的定义:

/** Interface for stream element queues for the {@link AsyncWaitOperator}. */
@Internal
public interface StreamElementQueue<OUT> {

    /**
     * Tries to put the given element in the queue. This operation succeeds if the queue has
     * capacity left and fails if the queue is full.
     *
     * <p>This method returns a handle to the inserted element that allows to set the result of the
     * computation.
     *
     * @param streamElement the element to be inserted.
     * @return A handle to the element if successful or {@link Optional#empty()} otherwise.
     */
    Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);

    /**
     * Emits one completed element from the head of this queue into the given output.
     *
     * <p>Will not emit any element if no element has been completed (check {@link
     * #hasCompletedElements()} before entering any critical section).
     *
     * @param output the output into which to emit
     */
    void emitCompletedElement(TimestampedCollector<OUT> output);

    /**
     * Checks if there is at least one completed head element.
     *
     * @return True if there is a completed head element.
     */
    boolean hasCompletedElements();

    /**
     * Returns the collection of {@link StreamElement} currently contained in this queue for
     * checkpointing.
     *
     * <p>This includes all non-emitted, completed and non-completed elements.
     *
     * @return List of currently contained {@link StreamElement}.
     */
    List<StreamElement> values();

    /**
     * True if the queue is empty; otherwise false.
     *
     * @return True if the queue is empty; otherwise false.
     */
    boolean isEmpty();

    /**
     * Return the size of the queue.
     *
     * @return The number of elements contained in this queue.
     */
    int size();
}

接口定义了5个方法:

  • tryPut:向队列中插入数据;
  • emitCompletedElement:向下游发送已经完成的异步请求,即图上中的Emitter;
  • hasCompletedElements:判断是否有已经完成的异步请求
  • values:当前队列中的事件
  • isEmpty:队列是否为空
  • size:队列大小

下面分别看下有序和无序队列是如何实现这个接口的。

有序队列OrderedStreamElementQueue

有序实现比较简单,先进先出就行了。所以OrderedStreamElementQueue的代码量不多,我就全贴这里了:

@Internal
public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final Queue<StreamElementQueueEntry<OUT>> queue;

    public OrderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (hasCompletedElements()) {
            final StreamElementQueueEntry<OUT> head = queue.poll();
            head.emitResult(output);
        }
    }

    @Override
    public List<StreamElement> values() {
        List<StreamElement> list = new ArrayList<>(this.queue.size());
        for (StreamElementQueueEntry e : queue) {
            list.add(e.getInputElement());
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (queue.size() < capacity) {
            StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

            queue.add(queueEntry);

            LOG.debug(
                    "Put element into ordered stream element queue. New filling degree "
                            + "({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into ordered stream element queue because it "
                            + "was full ({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
        if (streamElement.isRecord()) {
            return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
        }
        if (streamElement.isWatermark()) {
            return new WatermarkQueueEntry<>((Watermark) streamElement);
        }
        throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
    }
}

里面就使用了一个ArrayDeque实现了队列,事件按顺序进入队列,出队列调用的是queue.poll(),所以顺序自然是保证的。在出队列的时候会先判断是否有已经完成的异步请求,即hasCompletedElements,看下它的实现:

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

代码里面调用queue中头部元素(peek)的isDone方法。队列里面的元素是StreamElementQueueEntry接口类型,该接口继承自ResultFuture接口,且对应的实现类是StreamRecordQueueEntry,我们看下里面相关的实现代码:

class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {
    @Nonnull private final StreamRecord<?> inputRecord;

    private Collection<OUT> completedElements;

    StreamRecordQueueEntry(StreamRecord<?> inputRecord) {
        this.inputRecord = Preconditions.checkNotNull(inputRecord);
    }

    @Override
    public boolean isDone() {
        return completedElements != null;
    }

    @Nonnull
    @Override
    public StreamRecord<?> getInputElement() {
        return inputRecord;
    }

    @Override
    public void emitResult(TimestampedCollector<OUT> output) {
        output.setTimestamp(inputRecord);
        for (OUT r : completedElements) {
            output.collect(r);
        }
    }

    @Override
    public void complete(Collection<OUT> result) {
        this.completedElements = Preconditions.checkNotNull(result);
    }
}

可以看到isDone方法的逻辑是判断completedElements是否为null,而completedElements的赋值就是在complete方法中的。这就是为什么我们必须在AsyncFunction#asyncInvoke(...)中定义的回调函数中调用ResultFuture#complete的原因。如果不这样,队列中的异步事件就无法被标记为已完成,当队列满了以后,整个系统的数据流就卡主了。当然异常时调用ResultFuture#completeExceptionally也是OK的,这个在AsyncWaitOperator.ResultHandler类中也有调用。

最后附一张来自Jark博客的图:

ordered

无序队列UnorderedStreamElementQueue

无序队列队列相对复杂一些。如果是使用Processing Time,那就是全局无序,也比较简单;但当使用Event Time的时候,如前文所述,就不是全局无序了,在相邻的两个watermark产生的区间内(代码实现中称为一个Segment)事件可以无序;但不同的区间还是要有序,即整体还是必须遵从Watermark的限制。所以为了实现这种机制,无序队列引入了Segment,一个Segment表示相邻watermark产生的一个区间。也就是Segment内无序,Segment之间有序。我们看下代码实现。

@Internal
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

    private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue of queue entries segmented by watermarks. */
    private final Deque<Segment<OUT>> segments;

    private int numberOfEntries;

    public UnorderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        // most likely scenario are 4 segments <elements, watermark, elements, watermark>
        this.segments = new ArrayDeque<>(4);
        this.numberOfEntries = 0;
    }

    @Override
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (size() < capacity) {
            StreamElementQueueEntry<OUT> queueEntry;
            if (streamElement.isRecord()) {
                queueEntry = addRecord((StreamRecord<?>) streamElement);
            } else if (streamElement.isWatermark()) {
                queueEntry = addWatermark((Watermark) streamElement);
            } else {
                throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
            }

            numberOfEntries++;

            LOG.debug(
                    "Put element into unordered stream element queue. New filling degree "
                            + "({}/{}).",
                    size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into unordered stream element queue because it "
                            + "was full ({}/{}).",
                    size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
        // ensure that there is at least one segment
        Segment<OUT> lastSegment;
        if (segments.isEmpty()) {
            lastSegment = addSegment(capacity);
        } else {
            lastSegment = segments.getLast();
        }

        // entry is bound to segment to notify it easily upon completion
        StreamElementQueueEntry<OUT> queueEntry =
                new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
        lastSegment.add(queueEntry);
        return queueEntry;
    }

    private Segment<OUT> addSegment(int capacity) {
        Segment newSegment = new Segment(capacity);
        segments.addLast(newSegment);
        return newSegment;
    }

    private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
        Segment<OUT> watermarkSegment;
        if (!segments.isEmpty() && segments.getLast().isEmpty()) {
            // reuse already existing segment if possible (completely drained) or the new segment
            // added at the end of
            // this method for two succeeding watermarks
            watermarkSegment = segments.getLast();
        } else {
            watermarkSegment = addSegment(1);
        }

        StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
        watermarkSegment.add(watermarkEntry);

        // add a new segment for actual elements
        addSegment(capacity);
        return watermarkEntry;
    }

    @Override
    public boolean hasCompletedElements() {
        return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (segments.isEmpty()) {
            return;
        }
        final Segment currentSegment = segments.getFirst();
        numberOfEntries -= currentSegment.emitCompleted(output);

        // remove any segment if there are further segments, if not leave it as an optimization even
        // if empty
        if (segments.size() > 1 && currentSegment.isEmpty()) {
            segments.pop();
        }
    }

    @Override
    public List<StreamElement> values() {
        List<StreamElement> list = new ArrayList<>();
        for (Segment s : segments) {
            s.addPendingElements(list);
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return numberOfEntries == 0;
    }

    @Override
    public int size() {
        return numberOfEntries;
    }

    /** An entry that notifies the respective segment upon completion. */
    static class SegmentedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> {
        private final Segment<OUT> segment;

        SegmentedStreamRecordQueueEntry(StreamRecord<?> inputRecord, Segment<OUT> segment) {
            super(inputRecord);
            this.segment = segment;
        }

        @Override
        public void complete(Collection<OUT> result) {
            super.complete(result);
            segment.completed(this);
        }
    }

    /**
     * A segment is a collection of queue entries that can be completed in arbitrary order.
     *
     * <p>All elements from one segment must be emitted before any element of the next segment is
     * emitted.
     */
    static class Segment<OUT> {
        /** Unfinished input elements. */
        private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

        /** Undrained finished elements. */
        private final Queue<StreamElementQueueEntry<OUT>> completedElements;

        Segment(int initialCapacity) {
            incompleteElements = new HashSet<>(initialCapacity);
            completedElements = new ArrayDeque<>(initialCapacity);
        }

        /** Signals that an entry finished computation. */
        void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
            // adding only to completed queue if not completed before
            // there may be a real result coming after a timeout result, which is updated in the
            // queue entry but
            // the entry is not re-added to the complete queue
            if (incompleteElements.remove(elementQueueEntry)) {
                completedElements.add(elementQueueEntry);
            }
        }

        /**
         * True if there are no incomplete elements and all complete elements have been consumed.
         */
        boolean isEmpty() {
            return incompleteElements.isEmpty() && completedElements.isEmpty();
        }

        /**
         * True if there is at least one completed elements, such that {@link
         * #emitCompleted(TimestampedCollector)} will actually output an element.
         */
        boolean hasCompleted() {
            return !completedElements.isEmpty();
        }

        /**
         * Adds the segmentd input elements for checkpointing including completed but not yet
         * emitted elements.
         */
        void addPendingElements(List<StreamElement> results) {
            for (StreamElementQueueEntry<OUT> element : completedElements) {
                results.add(element.getInputElement());
            }
            for (StreamElementQueueEntry<OUT> element : incompleteElements) {
                results.add(element.getInputElement());
            }
        }

        /**
         * Pops one completed elements into the given output. Because an input element may produce
         * an arbitrary number of output elements, there is no correlation between the size of the
         * collection and the popped elements.
         *
         * @return the number of popped input elements.
         */
        int emitCompleted(TimestampedCollector<OUT> output) {
            final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
            if (completedEntry == null) {
                return 0;
            }
            completedEntry.emitResult(output);
            return 1;
        }

        /**
         * Adds the given entry to this segment. If the element is completed (watermark), it is
         * directly moved into the completed queue.
         */
        void add(StreamElementQueueEntry<OUT> queueEntry) {
            if (queueEntry.isDone()) {
                completedElements.add(queueEntry);
            } else {
                incompleteElements.add(queueEntry);
            }
        }
    }
}

队列部分的代码如下:

private final Deque<Segment<OUT>> segments;
// most likely scenario are 4 segments <elements, watermark, elements, watermark>
this.segments = new ArrayDeque<>(4);


/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;

Segment(int initialCapacity) {
    incompleteElements = new HashSet<>(initialCapacity);
    completedElements = new ArrayDeque<>(initialCapacity);
}

可以看到整体还是一个ArrayDeque,但队列内部是一个Segment;Segment内部又分成了2个队列:未完成请求队列incompleteElements和已完成请求的队列completedElements。Segment内部是无序的,所以incompleteElements使用的数据类型是Set。当某个请求完成后,就转移到completedElements中。此时,如果来的是普通事件,则调用addRecord将事件放入对应Segment的incompleteElements队列中;如果来的是watermark,则调用addSegment创建一个新的Segment。如果使用Processing Time,那就永远没有watermark,也就是全局就是一个大的Segment,也就是全局无序。

最后附一张Jark博客的图:

unordered

总结

Flink Asycn I/O是阿里结合自身实践场景贡献给社区的一个特性,所以自然是有很多实际需求的。流中关联维表的操作在具体业务中也的确很常见,异步IO就是应对这些场景的利剑。在具体使用时要特别注意本文“使用注意点”一节的几个点。

References:

]]>
0 http://niyanchun.com/flink-quick-learning-7-async-io.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(6)——常用算子(Operator) http://niyanchun.com/flink-quick-learning-6-operators.html http://niyanchun.com/flink-quick-learning-6-operators.html Sun, 28 Mar 2021 21:39:00 +0800 NYC Flink的Stream Job就是由一些算子构成的(Source和Sink实质也是特殊的算子而已),本文介绍常见的DataStream算子(Operator)。我用一种不太科学的方式将这些算子分成了2类,并起了一个不太严谨的名字:

  • 单流算子:这类算子一般在一个流上面使用;
  • 多流算子:这类算子往往操作多个流。

单流算子

单流算子大都比较简单,粗略介绍。

  1. map/flatmap:使用最多的算子,map是输入一个元素,输出一个元素;flatmap是输入一个元素,输出0个或多个元素。
  2. filter:过滤,条件为真就继续往下传,为假就过滤掉了。
  3. keyBy:按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,经过keyBy的流变成KeyedStream。一般和其它算子一起使用。
  4. reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:

    public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        // produce 1,2,3,4,5
        DataStream<Long> source = env.fromSequence(1, 5);
    
        source.keyBy(value -> 1)    // 所有数据在一个Partition
            .reduce(new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long value1, Long value2) throws Exception {
                // value1 保存上次迭代的结果值
                System.out.printf("value1: %d, value2: %d\n", value1, value2);
                return value1 + value2;
            }
            }).print();
    
        env.execute();
    }
    }

    输出如下:

    value1: 1, value2: 2
    3
    value1: 3, value2: 3
    6
    value1: 6, value2: 4
    10
    value1: 10, value2: 5
    15
  5. Aggregation算子:在KeyedStream上面使用,包括summinminBymaxmaxBy。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧:

     public class AggregationDemo {
         public static void main(String[] args) throws Exception {
             StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment();
             StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment();
             envMin.setParallelism(1);
             envMinBy.setParallelism(1);
    
             List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
             data.add(new Tuple3<>(0, 2, 4));
             data.add(new Tuple3<>(0, 4, 5));
             data.add(new Tuple3<>(0, 3, 3));
             data.add(new Tuple3<>(0, 1, 2));
             data.add(new Tuple3<>(1, 2, 4));
             data.add(new Tuple3<>(1, 5, 1));
             data.add(new Tuple3<>(1, 1, 0));
             data.add(new Tuple3<>(1, 2, 2));
    
             System.out.println("Min:");
             DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data);
             sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print();
    
             envMin.execute();
    
             System.out.println("\nMinBy:");
             DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data);
             sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print();
    
             envMinBy.execute();
         }
     }

    输出结果:

     Min:
     (0,2,4)
     (0,2,4)
     (0,2,4)
     (0,1,4)
     (1,2,4)
     (1,2,4)
     (1,1,4)
     (1,1,4)
    
     MinBy:
     (0,2,4)
     (0,2,4)
     (0,2,4)
     (0,1,2)
     (1,2,4)
     (1,2,4)
     (1,1,0)
     (1,1,0)

    可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。

  6. window类算子:这个就不说了,前面的多篇文章已经介绍过了。

下面看稍微复杂一些的多流算子。

多流算子

多流算子的差异点主要体现在以下3个方面:

  • 能同时处理的流的个数
  • 是否可以处理不同类型的流
  • 如何处理

union

union用于将多个同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:

public class UnionDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Long> source = env.fromSequence(0, 5);
    source.print();

    DataStream<Long> unionStream = source.union(source);
    unionStream.print();

    env.execute();
  }
}

输出:

# source 
0
1
2
3
4
5
# unionStream
0
0
1
1
2
2
3
3
4
4
5
5

join

join只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:

  • Window Join:通过窗口获取一个数据集

    • Tumbling Window Join
    • Sliding Window Join
    • Session Window Join
  • Interval Join:通过定义一个时间段获取一个数据集

Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):

Tumbling-Window-Join

图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:

public class WindowJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.join(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>)
            (orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1)
        .print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);

      ctx.emitWatermark(new Watermark(1000));
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);

      ctx.emitWatermark(new Watermark(2000));
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);

      ctx.emitWatermark(new Watermark(3000));
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500);
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500);
      ;
    }

    @Override
    public void cancel() {
    }
  }
}

代码运行如下:

0,0
0,1
1,0
1,1
2,3
3,3
4,4
5,4

Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound],然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]。IntervalJoin目前只支持Event Time。 其语法如下:

stream.keyBy(<KeySelector>)
    .intervalJoin(otherStream.keyBy(<KeySelector>))
    .between(<lowerBoundTime>, <upperBoundTime>)
    .process (<ProcessJoinFunction>);

这里再借用官方的一个图:

Interval-Join

上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1],下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:

public class IntervalJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception {
            out.collect(orangeTuple2.f1 + "," + greenTuple2.f1);
          }
        }).print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      // window: [1, 2)
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1);
      // window: [6, 7)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6);
      // window: [7, 8)
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

      ctx.emitWatermark(new Watermark(0));
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.emitWatermark(new Watermark(2));
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2);
      ctx.emitWatermark(new Watermark(3));
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3);
      ctx.emitWatermark(new Watermark(4));
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4);
      ctx.emitWatermark(new Watermark(5));
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5);
      ctx.emitWatermark(new Watermark(7));
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }
}

输出如下:

0,0
0,1
2,0
2,1
5,6
7,6
7,7

总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。

cogroup

前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:

dataStream.coGroup(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>
    .apply (new CoGroupFunction () {...});

相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:

public class CoGroupDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource());

    orangeStream.coGroup(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception {
            StringBuffer sb = new StringBuffer();
            sb.append("window:{ orange stream elements: [ ");
            orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("], green stream elements: [ ");
            green.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("]");
            out.collect(sb.toString());
          }
        })
        .print();

    env.execute();
  }
}

输出如下:

window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ]
window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ]
window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ]
window:{ orange stream elements: [ ], green stream elements: [ 4 ]
window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]

connect

connect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。

单独的connect:

public class ConnectDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });

    dataStream.connect(ruleStream)
        .flatMap(new CoFlatMapFunction<Long, String, Object>() {
          String rule;

          @Override
          public void flatMap1(Long value, Collector<Object> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void flatMap2(String value, Collector<Object> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出如下:

update rule, old rule = null, new rule = one
1
2
3
4
5
6
7
8
9
update rule, old rule = one, new rule = two
10
12
14
16
18
update rule, old rule = two, new rule = three
21
24
27
30
33
36
...

配合broadcast实现:

public class BroadcastDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });
    MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
        "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(new TypeHint<String>() {
        }));
    BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

    dataStream.connect(ruleBroadcastStream)
        .process(new BroadcastProcessFunction<Long, String, Long>() {
          String rule;

          @Override
          public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出与单独connect的输出一模一样,这里就不再附运行结果了。

Iterate

Iterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。

public class IterateDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4);
    tuple2s.add(new Tuple2<>("Jim", 3));
    tuple2s.add(new Tuple2<>("John", 2));
    tuple2s.add(new Tuple2<>("Lily", 1));
    tuple2s.add(new Tuple2<>("Lucy", 4));

    // source
    DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s);
    source.print();
    // 得到一个IterativeStream
    IterativeStream<Tuple2<String, Integer>> iteration = source.iterate();
    // 执行常规计算
    DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
        System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1);
        return new Tuple2<>(value.f0, value.f1 - 1);
      }
    });
    // 找出需要迭代的元素形成一个流
    DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
      @Override
      public boolean filter(Tuple2<String, Integer> value) throws Exception {
        boolean greaterThanZero = value.f1 > 0;
        if (greaterThanZero) {
          System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1);
        }
        return greaterThanZero;
      }
    });
    // 将需要迭代的流传递给closeWith方法
    iteration.closeWith(stillGreaterThanZero);

    minusOne.filter(tuple2 -> tuple2.f1 <= 0).print();

    env.execute();
  }
}

输出如下:

(Jim,3)
(John,2)
(Lily,1)
(Lucy,4)
[minusOne] Jim: 3
Jim is still greater than zero(now: 2), back to minusOne
[minusOne] John: 2
John is still greater than zero(now: 1), back to minusOne
[minusOne] Lily: 1
(Lily,0)
[minusOne] Lucy: 4
Lucy is still greater than zero(now: 3), back to minusOne
[minusOne] Jim: 2
Jim is still greater than zero(now: 1), back to minusOne
[minusOne] John: 1
(John,0)
[minusOne] Lucy: 3
Lucy is still greater than zero(now: 2), back to minusOne
[minusOne] Jim: 1
(Jim,0)
[minusOne] Lucy: 2
Lucy is still greater than zero(now: 1), back to minusOne
[minusOne] Lucy: 1
(Lucy,0)

总结

一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。

下篇文章介绍另外一个非常重要的算子:Flink Async I/O。

]]>
2 http://niyanchun.com/flink-quick-learning-6-operators.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(5)——Job&amp;&amp;Task&amp;&amp;Subtask&amp;&amp;SlotSharing http://niyanchun.com/flink-quick-learning-job-task-slotsharing.html http://niyanchun.com/flink-quick-learning-job-task-slotsharing.html Sun, 27 Dec 2020 11:20:00 +0800 NYC 本文讲一些比较八股的概念性东西,不是特别实用,但对于理解任务的运行非常有帮助。先做个自我检测:你知道Flink的Job指什么吗?Task呢?Subtask呢?这些和JVM Thread的对应关系是什么?你能估计出你的应用跑起来以后大概会产生多少个Thread吗?你知道你的应用需要多少个Slot吗?OK,如果你都清楚了,那Ctrl/Command+w吧...。如果还有些疑问,可以继续往下看。

注意:本文的“任务”一词是个通用概念,不代表Task。

Job && Task && Subtask

Job最容易理解,一个Job代表一个可以独立提交的大任务,可以认为一个execute或者executeAsync就产生一个Job,我们向JobManager提交任务的时候就是以Job为单位的,只不过一份代码里面可以包含多个Job。比如SocketWordCount就是一个Job。

至于Task和Subtask看下面我画的图:

task-chain

图说明如下:

  • 图中每个圆代表一个Operator,每个虚线圆角框代表一个Task,每个虚线方框代表一个Subtask,其中的p表示并行度。
  • 最上面是StreamGraph,是没有经过任何优化的时候,可以看到包含4个Operator/Task:Task A1、Task A2、Task B、Task C。
  • StreamGraph经过Operator Chain之后,Task A1和Task A2两个Task合并成了一个新的Task A(同时也可以认为合并产生了一个新的Operator),得到了中间的JobGraph。
  • 然后以并行度为2(需要2个slot)执行的时候,Task A产生了2个Subtask,分别占用了Thread #1和Thread #2两个线程;Task B产生了2个Subtask,分别占用了Thread #3和Thread #3两个线程;Task C产生了1个Subtask,占用了Thread5.

应该非常清楚了,下面总结一下结论:

  1. Task是逻辑概念,一个Operator就代表一个Task(多个Operator被chain之后产生的新Operator算一个Operator);
  2. 真正运行的时候,Task会按照并行度分成多个Subtask,Subtask是执行/调度的基本单元
  3. 每个Subtask需要一个线程来执行

好了,理解了这些概念,评估一个应用大概会产生多少个线程的时候(不考虑一些框架自身的线程)根据JobGraph就可以大概计算出来了:

$$ \sum_{i=1}^n operator_i * parallelism_i $$

即累加所有Operator和它的并行度的乘积。

还没完,再讲一下Slot Sharing。

Slot Sharing

架构部分讲了TaskManager是真正干活的,启动的时候会将自己的资源以Slot的方式注册到ResourceManager,然后JobMaster从ResourceManager处申请到Slot资源之后将自己优化过后的任务调度到这些Slot上面去运行,在整个过程中Subtask是调度的基本单元,Slot则是资源分配的基本单元。需要注意的是目前Slot只隔离内存,不隔离CPU。

为了高效的使用资源,Flink默认允许同一个Job中不同Task的Subtask运行在一个Slot中,这就是SlotSharing。注意一下描述中的几个关键条件:

  1. 必须是同一个Job。这个很好理解,slot是给Job分配的资源,目的就是隔离各个Job,如果跨Job共享,但隔离就失效了;
  2. 必须是不同Task的Subtask。这样是为了更好的资源均衡和利用。一个计算流中(pipeline),每个Subtask的资源消耗肯定是不一样的,如果都均分slot,那必然有些资源利用率高,有些低。限制不同Task的Subtask共享可以尽量让资源占用高的和资源占用低的放一起,而不是把多个高的或多个低的放一起。比如一个计算流中,source和sink一般都是IO操作,特别是source,一般都是网络读,相比于中间的计算Operator,资源消耗并不大。
  3. 默认是允许sharing的,也就是你也可以关闭这个特性。

下面看下官方的两个图:

6个Slot,5个Subtask,并行度为2:

slot-sharing-2

此时Subtask少于Slot个数,所以每个Subtask独占一个Slot,没有SlotSharing。把并行度改为6:

slot-sharing-6

此时,Subtask的个数多于Slot了,所以出现了SlotSharing,一个Slot中分配了多个Subtask,特别是最左边的Slot中跑了一个完整的Pipeline。SlotSharing除了提高了资源利用率,还简化了并行度和Slot之间的关系——一个Job运行需要的Slot个数就是其中并行度最高的那个Task的并行度:

$$ Job需要的Slot个数=\max(parallelism_{task_1}, parallelism_{task_2}, ..., parallelism_{task_n}) $$

掌握了这些八股知识,就能更好的评估资源了。

Reference:

]]>
0 http://niyanchun.com/flink-quick-learning-job-task-slotsharing.html#comments http://niyanchun.com/feed/tag/flink/
Flink Native Kubernetes支持Volume Mount http://niyanchun.com/flink-native-k8s-volume-mount.html http://niyanchun.com/flink-native-k8s-volume-mount.html Wed, 23 Dec 2020 22:02:00 +0800 NYC 在之前的文章 Flink快速了解(4)——NativeKubernetes&HA 中讲到 Native Kubernetes在Flink 1.12版本中已经成为一个正式特性,使用起来也的确非常的简单、方便,但文末提到我碰到的一个问题:无法挂载volume。其实目前Flink Native Kubernetes这种方式提供的容器自定义能力还非常有限。从代码看,是通过一个个配置去支持的(见KubernetesConfigOptions.java),但k8s的Pod定义选项太多了,通过这种方式去支持,会一直疲于奔命,而且还要不断的和k8s版本关联。所以,目前社区有一个JIRA FLINK-15656: Support user-specified pod templates,计划直接支持用户自定义pod template,但目前好像还没有明确的版本计划。另外,考虑到pod挂载volume是一个更加普遍化的高需求,所以还有一个单独的JIRA FLINK-15649: Support mounting volumes,不过目前也没有明确的版本计划。

我看了一下这个JIRA,其实已经有人提了PR(#14283)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的(点此下载,密码: hi52。怎么现在分享还必须设置密码了...)。

使用说明

这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:

// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
    key("kubernetes.jobmanager.volumemount")
        .stringType()
        .noDefaultValue()
        .withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
            "Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
            "parameters - name used for mount, mounting path and volume specific parameters");

public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
    key("kubernetes.taskmanager.vlumemount")
        .stringType()
        .noDefaultValue()
        .withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
            "Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
            "parameters - name used for mount, mounting path and volume specific parameters");

也可以从单元测试文件看使用方法:

// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
    super.setupFlinkConfig();

    this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
        VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
            + VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
            + VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
            + VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}

emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:

  • 方式1:不使用subpath,共5个参数。示例:-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>。最后一个false或者true表示是否以只读方式挂载。
  • 方式2:使用subpath,共6个参数。示例:-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>

下面利用这个PR实现基于NFS的Flink Kubernetes HA。

  1. 先用修改过的flink-dist_2.11-1.12.0.jar替换官方包里面lib目录下的flink-dist_2.11-1.12.0.jar(懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。
  2. 准备好一个pvc,这里我使用的是nfs storage-class提供的一个pvc:
$ kubectl get pvc            
NAME           STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
flink-ha-pvc   Bound    pvc-46537a5b-2adc-442e-ae59-52af4c681f2c   500Mi      RWX            nfs-storage    16h
  1. 以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):

    $ ./bin/flink run-application \
         --target kubernetes-application \
         -Dkubernetes.cluster-id=flink-application-cluster \
         -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
         -Dhigh-availability.storageDir=file:///opt/flink/flink-ha \
         -Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \
         -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
         -Dkubernetes.rest-service.exposed.type=NodePort \
         local:///opt/flink/usrlib/TopSpeedWindowing.jar

检查一下:

$ kubectl get pod
NAME                                        READY   STATUS    RESTARTS   AGE
flink-application-cluster-9589dbf58-hm7xj   1/1     Running   0          76s
flink-application-cluster-taskmanager-1-1   1/1     Running   0          33s


$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name:         flink-application-cluster-9589dbf58-hm7xj
Namespace:    default
Priority:     0
Node:         10.9.1.18/10.9.1.18
Start Time:   Thu, 24 Dec 2020 10:26:07 +0800
Labels:       app=flink-application-cluster
              component=jobmanager
              pod-template-hash=9589dbf58
              type=flink-native-kubernetes
Annotations:  <none>
Status:       Running
IP:           172.20.0.165
IPs:
  IP:           172.20.0.165
Controlled By:  ReplicaSet/flink-application-cluster-9589dbf58
Containers:
  flink-job-manager:
    Container ID:  docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
    Image:         top-speed-windowing:1.12.0
    Image ID:      docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
    Ports:         8081/TCP, 6123/TCP, 6124/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP
    Command:
      /docker-entrypoint.sh
    Args:
      native-k8s
      $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
    State:          Running
      Started:      Thu, 24 Dec 2020 10:26:11 +0800
    Ready:          True
    Restart Count:  0
    Limits:
      cpu:     1
      memory:  1600Mi
    Requests:
      cpu:     1
      memory:  1600Mi
    Environment:
      _POD_IP_ADDRESS:   (v1:status.podIP)
    Mounts:
      /opt/flink/conf from flink-config-volume (rw)
      /opt/flink/flink-ha from jobmanager-ha (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  flink-config-volume:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      flink-config-flink-application-cluster
    Optional:  false
  jobmanager-ha:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  flink-ha-pvc
    ReadOnly:   false
  default-token-pzw5h:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-pzw5h
    Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type     Reason       Age                From                Message
  ----     ------       ----               ----                -------
  Normal   Scheduled    <unknown>          default-scheduler   Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
  Warning  FailedMount  80s (x2 over 81s)  kubelet, 10.9.1.18  MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
  Normal   Pulled       78s                kubelet, 10.9.1.18  Container image "top-speed-windowing:1.12.0" already present on machine
  Normal   Created      78s                kubelet, 10.9.1.18  Created container flink-job-manager
  Normal   Started      77s                kubelet, 10.9.1.18  Started container flink-job-manager

可以看到已经正确挂载了。

这个PR能不能用于生产?

能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。

另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。

不过,如果你只想完全用官方的东西,那完全可以像之前版本一样,使用非Native的方式在Kubernetes上面部署Flink,不过我还是喜欢Native的东西,更加简单。

]]>
0 http://niyanchun.com/flink-native-k8s-volume-mount.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(4)——NativeKubernetes&amp;HA http://niyanchun.com/flink-quick-learning-4-native-kubernetes-ha.html http://niyanchun.com/flink-quick-learning-4-native-kubernetes-ha.html Mon, 21 Dec 2020 23:41:00 +0800 NYC Flink的1.12.0版本前段时间发布了,又带来了很多新特性,其中有两个跟容器化相关的特性:

  • Native Kubernetes部署方式由之前的实验性(experimental)变为正式特性,也就是我们可以在生产环境里面放心大胆的使用了;
  • Kubernetes上面Flink的高可用除了ZooKeeper外又多了一种更轻量级的,更Native的基于ConfigMap的方案选择。

当然,这些特性目前在有些细小方面还是存在一些不足(不过瑕不掩瑜),下面的测试中会有所说明。

准备镜像

Flink 1.12已经有一段时间了,但官方的镜像到现在也还没有推上去,如果你用的是最新的1.12.0版本,看官方文档的时候一定要注意,文档里面都没有指定镜像名或者tag,这样拉取到的其实还是1.11.x版本的镜像,然后打出来的镜像在1.12的flink上面运行大概率是会报错的。

所以本次测试是我自己打的镜像。

flink 1.12.0镜像

git clone https://github.com/apache/flink-docker
# 此处可以选择你自己的Scala和JDK版本
cd 1.12/scala_2.11-java8-debian 
docker build  --tag flink:1.12.0-scala_2.11  .
Tips:Flink默认的镜像命名规则为:flink:<FLINK_VERSION>-scala_<SCALA_VERSION>",当然你也可以自定义,然后在启动的时候通过kubernetes.container.image参数指定镜像名称。

flink hadoop镜像

后面测试高可用的时候,我使用了HDFS,默认的镜像里面是不包含HDFS的依赖的,所以需要自己加进去(自行从maven下载)。如果你用的是oss、s3、swift之类的,就不需要了,默认已经包含了,只需要启动的时候配置一下即可。下面是Dockerfile:

FROM flink:1.12.0-scala_2.11
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib/

执行docker build --tag flink-haddop:1.12.0-scala_2.11 .

TopSpeedWindowing镜像

下面还会演示Flink Application Cluster,这种模式下需要把应用打到flink镜像里面,方法也非常简单,把应用的jar拷贝到$FLINK_HOME/usrlib即可,如果你需要定制配置文件的话,也可以加到这里。下面以flink自带的TopSpeedWindowing为例,该应用的jar在flink安装包的examples/streaming目录下。下面是Dockerfile:

FROM flink-haddop:1.12.0-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar

执行docker build --tag top-speed-windowing:1.12.0 .

这样本文用到的所有镜像就准备完成了。

Flink Native Kubernetes

先下载并解压Flink 1.12.0版本(过程略),下面的命令都是在flink解压后的目录执行的。

Flink Session Cluster

前文介绍过了,Flink Session Cluster就是先部署一个集群,然后往集群上面提交任务。所以我们先创建一个集群:

./bin/kubernetes-session.sh \
    -Dkubernetes.container.image=flink:1.12.0-scala_2.11 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dtaskmanager.numberOfTaskSlots=2 \
    -Dkubernetes.cluster-id=flink-session-cluster


# 下面是输出:
2020-12-21 22:55:35,527 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-12-21 22:55:35,538 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-21 22:55:35,538 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2020-12-21 22:55:35,540 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-12-21 22:55:35,773 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-12-21 22:55:38,128 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,148 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,163 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2020-12-21 22:55:38,163 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2020-12-21 22:55:38,259 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:44,134 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster flink-session-cluster successfully, JobManager Web Interface: http://10.9.1.18:38566

这样一个集群就部署好了,Native Kubernetes的部署方式使用了Kubernetes的资源管理和分配能力,所以此时集群只有Jobmanager。TaskManager会在后面有任务时动态创建出来。输出日志的最后一行打印了Web UI地址,可以检查集群状态。

$ kubectl get pod,svc,cm,deploy
NAME                                         READY   STATUS    RESTARTS   AGE
pod/flink-session-cluster-665766d9d5-8jxmb   1/1     Running   0          13s

NAME                                 TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
service/flink-session-cluster        ClusterIP   None          <none>        6123/TCP,6124/TCP   12s
service/flink-session-cluster-rest   NodePort    10.68.245.4   <none>        8081:38566/TCP      12s
service/kubernetes                   ClusterIP   10.68.0.1     <none>        443/TCP             38d

NAME                                           DATA   AGE
configmap/flink-config-flink-session-cluster   3      12s

NAME                                    READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-session-cluster   1/1     1            1           13s

集群部署好了,提交一个任务:

./bin/flink run -p 4 \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=flink-session-cluster \
    ./examples/streaming/TopSpeedWindowing.jar 

# 查看一下动态创建的TaskManager
kubectl get pod
NAME                                     READY   STATUS    RESTARTS   AGE
flink-session-cluster-665766d9d5-8jxmb   1/1     Running   0          2m17s
flink-session-cluster-taskmanager-1-1    1/1     Running   0          33s
flink-session-cluster-taskmanager-1-2    1/1     Running   0          33s

然后在web上面删除任务,可以看到集群依然是存在的,因为Session Cluster的生命周期是独立于Job的。但TaskManager在空闲一段时间(resourcemanager.taskmanager-timeout,默认30秒)后会被回收。

最后删除测试集群:

kubectl delete deploy/flink-session-cluster

Flink Application Cluster

前文介绍过,Flink Application Cluster就是应用创建自己专属的集群,一个应用可以包含多个Job,集群生命周期和Job同步。

# 创建集群&&启动应用
./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

# 查看
kubectl get pod,svc,cm,deploy
NAME                                             READY   STATUS    RESTARTS   AGE
pod/flink-application-cluster-7fc5ccd899-9p7ns   1/1     Running   0          115s
pod/flink-application-cluster-taskmanager-1-1    1/1     Running   0          54s

NAME                                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/flink-application-cluster        ClusterIP   None            <none>        6123/TCP,6124/TCP   114s
service/flink-application-cluster-rest   NodePort    10.68.147.132   <none>        8081:39463/TCP      113s
service/kubernetes                       ClusterIP   10.68.0.1       <none>        443/TCP             38d

NAME                                               DATA   AGE
configmap/flink-config-flink-application-cluster   3      113s

NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-application-cluster   1/1     1            1           115s

和之前的Session Cluster相比,集群创建和任务提交集成在了一起,一条命令搞定了所有的事情,非常的方便。此时,如果你在Web上取消任务,你会发现整个集群都没了,符合Job Cluster生命周期与Job同步的说法。

Flink Job Cluster

目前不支持。Flink Job Cluster其实比较鸡肋,介于Session Cluster和Application Cluster之间,一般根据需要选则后面这两个即可。

基于Kubernetes的高可用

Flink的高可用主要解决JobManager的单点故障问题,之前只有一种基于Zookeeper的方案,1.12.0版本中增加了一个基于Kubernetes ConfigMap的方案(仅用于使用Kubernetes部署Flink的场景),该特性对应有一个FLIP-144: Native Kubernetes HA for Flink,对设计细节有兴趣的可以看下。这个原生的高可用方案使用也非常简单,在上的基础上再增加两个配置项即可:

# HA服务,zk的时候是zookeeper,Kubernetes的时候填下面这个类
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
# 虽然不再依赖zk了,但仍然需要高可用存储
high-availability.storageDir: hdfs:///flink/recovery

这里我使用的高可用存储是HDFS,下面分别创建两个集群:

# Flink Session Cluster
./bin/kubernetes-session.sh \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
    -Dkubernetes.cluster-id=flink-session-cluster

# Flink Application Cluster
./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

Native的HA方案真的是非常naive,使用非常简单,没什么需要讲的。

最后解释一些关于HA的问题。

1, 为什么上述命令启动的JobManager Pod只有1个副本?答:除了用上面Flink自己的kubernetes-session.sh或者flink命令这种便捷方式容器化外,也有很多人自己写部署文件或者使用官方提供的部署文件(特别是Native Kubernetes还不成熟的时候),有些人部署HA的时候将JobManager的副本数设置为2,甚至更多。其实在Kubernetes上面,这是没有必要的。如前所述,Flink提供的HA是主JobManager挂掉后,从快速接替主的职责,从共享存储上面恢复已经提交运行的任务。对于Kubernetes,如果JobManager的Pod挂掉了,马上会有一个新的JobManager Pod被创建出来。也就是这个standby的JobManager不是事先就创建好的,而是在需要的时候动态产生的。这样也不用平时空跑一个standby的JobManager浪费资源(注意:standby的JobManager是纯备,不对外提供任务服务,也没有什么负载均衡的功能)。

2, Flink的HA模式下,如果主JobManager挂掉了,任务会重启,这还算HA吗?会丢数据吗?答:这个问题也同时适用于基于Zookeeper的HA。Flink的HA模式依赖两部分:一部分是ZK或者Kubernetes的ConfigMap,这部分主要职责是负责选举主JobManager、服务发现、少量元数据存储(比如任务的运行状态、任务相关的二进制文件在共享存储上的存储路径等);另外一部分是共享存储(比如上面的HDFS),该部分的主要职责是存储任务相关的二进制文件,比如Job Graph及其依赖等。有了这两部分,当主JobManager挂掉后,就会有新的JobManager产生,它可以依据这些信息重新恢复之前运行的任务。注意,这里是恢复。也就是主节点故障后,任务会故障,但HA模式下,Flink会保证任务快速被恢复。那这种机制算HA吗?当然要看下HA的定义:

高可用(High Availability,HA)是分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计减少系统不能提供服务的时间。假设系统一直能够提供服务,我们说系统的可用性是100%。如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。在线系统和执行关键任务的系统通常要求其可用性要达到5个9标准(99.999%,年故障时间为5分15秒)。

按此定义来说,上述机制当然算HA,只是它提供的不是100%的可用性而已。另外,丢不丢数据这个其实跟HA关系不大,解决丢数据的问题是分布式中容错(Fault Tolerance)的职责,Flink提供了Checkpoint和Savepoint两种机制。也就是如果你开启了Checkpoint,那有没有HA都可以保证不丢数据;反之,没开启的话,有没有HA都可能会丢数据。

总结

可以看到,这两个新特性让Flink在Kubernetes上的使用变得简单了很多。当然,试用了一下,我个人觉得后面还是有一些工作需要完善,比如现在通过kubernetes-session.sh或者flink创建集群时,虽然已经支持很多配置项(见这里)来满足一些自定义,但还是不够。比如我刚开始想用nfs来测试高可用,但发现通过上面的方式创建集群的话,没有方式可以挂载PVC,像HDFS、OSS、S3这些不需要挂载就可以直接使用,但NFS不行,所以后面就又部署了一个HDFS进行测试。不过目前社区已经在改进这些不足了,见FLINK-15649: Support mounting volumesFLINK-15656: Support user-specified pod templates. 这部分我单独写了一篇文章Flink Native Kubernetes支持Volume Mount,有兴趣的可以查看。

参考:

]]>
4 http://niyanchun.com/flink-quick-learning-4-native-kubernetes-ha.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(3)——4种Graph http://niyanchun.com/flink-quick-learning-graph.html http://niyanchun.com/flink-quick-learning-graph.html Sun, 20 Dec 2020 09:50:00 +0800 NYC 本文介绍Flink任务流转过程中涉及的图,知道这些可以更好的了解Flink的运行流程。

flink-graph

如上图,Flink中有4种图:StreamGraphJobGraphExecutionGraphPhysicalGraph,分别处于不同的阶段,承担不同的职责。

StreamGraph

StreamGraph其实就是把我们的代码逻辑以拓扑图的形式组织了一下,其实现类的描述如下:

// StreamGraph.java
/**
 * Class representing the streaming topology. It contains all the information
 * necessary to build the jobgraph for the execution.
 */
@Internal
public class StreamGraph implements Pipeline {
...
}

StreamGraph可以查看,方法是在我们的execute()代码之前加入System.out.println(env.getExecutionPlan());,这样会输出一个JSON,将这个JSON粘贴到这里就可以查看StreamGraph。以官方的SocketWordCount为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows
 * </pre>
 * and run this example with the hostname and the port as arguments.
 */
@SuppressWarnings("serial")
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy(value -> value.word)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // ------------------------------------------------------------------------

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

这个代码是一个非常经典的流处理过程:Source(socketTextStream) --> Map(flatMap) --> Window(keyByWindow) --> Sink(print),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:

默认并行度设置为1(env.setParallelism(1);)时:

SocketWordCountStreamGraph-p1

默认并行度设置为4时:

SocketWordCountStreamGraph-p4

注:

  1. 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
  2. 这里分别展示了两个不同的并行度,是为了后面的对比用。

程序真正运行时也会先转化为StreamGraph,然后进一步转化为JobGraph。

JobGraph

JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为Logical Graph或者Dataflow Graph(题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了@Internal,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。

官网描述如下:

A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.

代码注释说明如下:

// JobGraph.java
/**
 * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
 * All programs from higher level APIs are transformed into JobGraphs.
 *
 * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
 * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
 * but inside certain special vertices that establish the feedback channel amongst themselves.
 *
 * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
 * define the characteristics of the concrete operation and intermediate data.
 */
public class JobGraph implements Serializable {
...
}

StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是Operator Chain

  • 什么是Operator Chain?就是将一些独立的小Operator合并为一个大的Operator,比如将上面的SourceFlatMap合并在一起。
  • 合并(chain)的条件是什么?其实条件还蛮多的,但简单理解就是如果两个算子的数据是直接Forward的,那就可以合并。反过来,如果两个Operator之间有shuffle(比如keyBy)、rebalance(比如并行度不一样)之类的操作,或者一个Operator有多个上游,那就不能合并。实际使用的时候,我们一般尽量让整个系统里面的算子并行度一致即可,这样能够合并的一般都会合并。
  • 合并的好处是什么?Flink运行的时候,一个任务是在一个线程里面运行的,将小的Operator合并成大的Operator,也就相当于将小任务合并成了大任务,这样他们就会在一个线程里面执行,避免了网络IO和序列化操作,同时也减少了线程数。这个细节可参考我后面的文章:Flink快速了解(5)——Job && Task && Subtask && SlotSharing
  • Operator Chain特性默认是开启的,用户可以在配置或者代码里面关掉这个特性(我目前还没有碰到实际运行的时候需要关闭这个优化项的需求场景...)。

JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):

inspect-job-graph

注意:如果应用有必填参数一定要填,否则会报错。

默认并行度设置为1时的JobGraph:

jobgraph-p1

可以看到默认并行度设置为1的时候,优化器把source和flatmap这两个operator串接(chain)成一个大的子任务了,把后面的window和sink两个operator串接(chain)成一个大的子任务了,这样到时候source和flatmap就在一个线程里面执行,window和sink在一个线程里面运行。

默认并行度设置为4时的JobGraph:

jobgraph-p4

如果我们把默认并行度设置为4,那图就变成了上面这样,可以看到相比于并行度为1的图,没有任何合并。主要原因是:

  1. 代码里的source是socketTextStream,这是一个不能并行的operator,所以并行度永远是1(强制设置为非1运行会报错);
  2. flatmap和window代码中没有设置并行度,所以都使用了默认并行度4,但因为中间是hash操作,所以也无法合并;
  3. 最后一个sink代码里面显式的设置了并行度为1(见代码),和它的直接上游并行度不一致,所以也无法chain。

很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。

Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是JobGraph该Job依赖的jar包

ExecutionGraph

JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为并行化可调度的执行图。这个动作是JobManager(其中的JobMaster组件)做的。

这一层已经更加具体化了,代码说明比较长,但对于使用者基本无需关注,为了完整性,这里也附一下,有兴趣的自行查看:

/**
 * The execution graph is the central data structure that coordinates the distributed
 * execution of a data flow. It keeps representations of each parallel task, each
 * intermediate stream, and the communication between them.
 *
 * <p>The execution graph consists of the following constructs:
 * <ul>
 *     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
 *         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
 *         The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
 *         from the JobGraph's corresponding JobVertex.</li>
 *     <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
 *         as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
 *         the ExecutionJobVertex and the index of the parallel subtask</li>
 *     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
 *         for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
 *         because it is no longer available when requested by later operations. An Execution is always
 *         identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
 *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
 *         address the message receiver.</li>
 * </ul>
 *
 * <h2>Global and local failover</h2>
 *
 * <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
 *
 * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
 * data flow graph from the last completed checkpoint. Global failover is considered the
 * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
 * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
 *
 * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
 * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
 * attempts to restart as little as possible, but as much as necessary.
 *
 * <p>Between local- and global failover, the global failover always takes precedence, because it
 * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
 * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
 * with every global failover (and other global actions, like job cancellation, or terminal
 * failure). Local failover is always scoped by the modification version that the execution graph
 * had when the failover was triggered. If a new global modification version is reached during
 * local failover (meaning there is a concurrent global failover), the failover strategy has to
 * yield before the global failover.
 */
public class ExecutionGraph implements AccessExecutionGraph {
...
}

PhysicalGraph

物理图其实是任务调度到TaskManager上面真正执行的一种“运行图”,它没有具体对应的类。官方说明如下:

A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.

这四个图越往下,越具体,也越难理解,但实际中对于使用而言,一般只需要知道有这几个环节转换即可,细节无需太关注。下面附一张Jark大神博客的图,更多细节可见参考部分给的他的文章:

graph-流转

总结

其实图这个概念并非Flink原创,Storm、Spark里面也有类似的概念,Flink也是站在巨人的肩膀上而已。从StreamGraph迭代到最终的PhysicalGraph,由抽象到具体,抽象的给人看,具体的给框架看,有点类似于代码编译时的预处理、编译、汇编、链接的过程。

参考:

]]>
2 http://niyanchun.com/flink-quick-learning-graph.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(2)——3种部署模式 http://niyanchun.com/flink-quick-learning-deployment-mode.html http://niyanchun.com/flink-quick-learning-deployment-mode.html Sat, 19 Dec 2020 22:47:00 +0800 NYC 截至1.12.0版本,Flink有3种集群部署/运行模式:

  • Flink Session Cluster
  • Flink Job Cluster
  • Flink Application Cluster

三种运行模式主要区别在3个方面:

  1. 集群的生命周期
  2. 集群的资源隔离
  3. main()方法在Client侧执行还是在集群侧执行

下面分别介绍一下。

Flink Session Cluster

该模式就是先有一个已经在运行的Flink集群(至少有JobManager),然后我们把任务提交上去,所有的任务都运行在这一个集群上面,典型的场景就是Standalone模式静态部署的普通集群。此时:

  • 集群生命周期:独立于任务,任务的开始、结束等不影响集群的生命周期。
  • 集群的资源隔离:所有任务都运行在一个集群上面,所以隔离性差。Flink的Slot仅能隔离内存,并不能隔离CPU资源。而且一个任务如果把TaskManager搞挂了,那上面的其它任务也会受牵连。
  • main()方法在Client侧执行。

该模式以前也称为"Flink Cluster in session mode".

Flink Job Cluster

该模式就是每个Job动态创建一个属于自己专有的集群,此时:

  • 集群生命周期:与任务生命周期同步,随任务运行而创建,随任务结束而消亡。
  • 集群的资源隔离:任务独占集群,隔离性最好。
  • main()方法在Client侧执行。

该模式以前也称为"Flink Cluster in per-job mode".

Flink Application Cluster

一个Application指包含一个或多个任务(Job)的程序,也就是包含多个executeexecuteAsync。该模式下,一个Application动态创建一个属于自己专有的集群,Application内的所有任务共享该集群,很显然这是一种介于Session Cluster和Job Cluster之间的模式:不同Application之间是完全隔离的,类似Job Cluster;但一个Application内的任务是不隔离的,类似于Session Cluster。此时:

  • 集群生命周期:与Application生命周期同步,随Application运行而创建,随Application结束而消亡。
  • 集群的资源隔离:Application之间隔离,Application内的所有任务共享集群,隔离性一般。
  • main()方法在集群侧执行。

该模式以前也称为"Flink Cluster in application mode".

三种模式对比

其实也没啥对比的,各自的优缺点非常简单明显。要对比的话,主要的对比点就是资源隔离、main()方法的执行位置、集群是否是动态创建三个方面。

  • 就资源隔离性而言,Flink Job Cluster、Flink Application Cluster、Flink Session Cluster隔离性依次降低。
  • Flink Application Cluster的 main()方法是在集群侧的JobManager中执行的,其它两种模式是在Client端执行的。这个对于一些比较大型或复杂的应用来说区别还是挺大的,毕竟集群侧的资源一般是比较充足的,而且可以负载均衡。Client测去执行main()方法可能会是一个瓶颈,特别是有多个人共享这个Client的时候。
  • 集群动态创建这个不是所有模式都支持的,一般只有依赖Kubernetes、YARN之类的模式才可以。动态创建的好处就是动态扩展会比较好,特别是横向的扩展。但弊端是每次提交任务都要先创建一个集群,对于那些执行时间短、频次高的任务可能就不是特别合适。

常见集群管理框架对三种模式的支持

Flink也支持一些第三方的集群管理框架,当使用这些框架时,集群的资源管理都会交给这些框架。目前支持:

  • Standalone:即不使用第三方集群管理框架,Flink自己管理集群。此时支持的运行模式包括:Session Cluster(Session Mode)、Application Cluster(Application Moe)。当容器化部署时(比如在Docker、K8s上面),也只支持这2种模式,不支持Job Cluster(Per-job mode)。
  • Native Kubernetes:即使用K8s作为集群管理框架,Flink 1.12版本中已经正式可用,可参见我的这篇文章:Flink快速了解(4)——NativeKubernetes&HA. 需要注意该方式和在k8s上面部署Standalone集群是不一样的:Native Kubernetes是深度集成,将集群资源管理交给了k8s;而Standalone on K8s只是容器化部署而已,集群管理还是完全由Flink自己做的。该模式也不支持Job Cluster(Per-job mode),其它2种都支持。
  • YARN:Hadoop生态最常用的资管管理、任务调度框架,功能很强大,一般在Hadoop生态部署Flink的,都会使用YARN管理Flink集群。Flink的3种运行模式在YARN上面都支持,且一般生产环境比较推荐Job Cluster(Per-job Mode)和Application Cluster(Application Mode)。
  • Mesos:一个“古老”、强大且被广泛使用的集群管理器,与Flink集成时,不支持Application Cluster(Application Moe),其它2种都支持。

表格看着更清楚:

-Session ClusterJob ClusterApplication Cluster
Standalone(包括on Docker,on K8s)支持不支持支持
Native Kubernetes支持不支持支持
YARN支持支持支持
Mesos支持支持不支持

参考:

]]>
9 http://niyanchun.com/flink-quick-learning-deployment-mode.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(1)——架构 http://niyanchun.com/flink-quick-learning-1-architecture.html http://niyanchun.com/flink-quick-learning-1-architecture.html Sat, 19 Dec 2020 19:51:00 +0800 NYC 了解一个系统当然是先从整体的架构开始(难道不应该是quick start吗?):

Flink Architecture

如上图,Flink这个分布式流批统一计算框架也是典型的主从架构,JobManager是主,TaskManager是从。JobManager其实是一个统称,其内部根据功能拆分成了3个大模块:

  • ResourceManager:如其名,就是做资源管理的。Flink里面资源是以TaskManager提供的Slot形式存在的,所以其实就是管理Slot的。TaskManager启动后会向ResourceManager报告自己的slot情况,并且通过心跳和通知机制定期更新。之所以把这个模块单独出来,是因为资源管理框架已经很多了(但功能、实现、使用又有差异),比如YARN、Mesos、Kubernetes。所以单独出来以后,方便分别实现支持不同框架的ResourceManager 。另外,ResourceManager在设计上:(1)是无状态的,因为它的数据都是别人主动报告给它的,所以重启后可以重新获取。不过有的实现可能会有一点状态。(2)故障后不影响已经在运行的任务。ResourceManager一个集群只有一个**。
  • JobMaster:JobMaster主要负责Job的调度运行、Checkpoint/Savepoint的触发、故障恢复等。每个Job都有一个自己的JobMaster
  • Dispatcher:Dispatcher的主要任务是接收Client提交上来的任务(这里的任务就是Job),然后为该任务创建JobMaster,之后将任务交给JobMaster去调度管理。所以Dispatcher自身并不会运行用户的代码,而且它提供的功能基本都是以http服务的形式向外暴露的,所以长期看,社区想把这个模块演变成一个Gateway,这样对于一些安全要求高的场景,这个模块就可以作为代理放在防火墙之外或对外网暴露(当然需要加认证),目前Flink WebUI的功能也是在该组件里面的。不考虑standby的情况下,Dispatcher一个集群只有一个

注意

  1. 这里JobManager和JobMaster可能容易混淆,在早期版本,很多地方(包括官方文档)把JobMaster也称为JobManager,所以如果看一些旧的文档,一定要注意说的JobManager是指广义的JobManager(即包含ResourceManager、JobMaster、Dispatcher)还是狭义的JobManager(即只指JobMaster)。甚至有些时期的文档把广义的JobManager称为JobMaster,把JobMaster称为JobManager
  2. 注意区分“任务”这个词指的是Job还是组成Job的Task,本文中的任务基本都指的是Job。

所以,一个flink任务提交流程就是图中所示的6步:

  1. 用户通过Client(比如命令行 flink run和Web UI)提交任务到Dispatcher;
  2. Dispatcher为该任务创建JobMaster;
  3. JobMaster向ResourceManager申请资源(即Slot);
  4. 此时如果没有资源,并且ResourceManager有动态创建TaskManager的能力(有的部署方式有,有的部署方式没有,后面介绍),ResourceManager就会创建TaskManager;没有动态创建TaskManager能力的部署方式,则需要先部署好TaskManager;
  5. TaskManager创建好之后,向ResourceManager注册自己的资源;
  6. ResourceManager分配资源后,该资源的所有者TaskManager就会向JobMaster提供资源。

Flink集群管理目前支持Standalone、YARN、Mesos、Kubernetes几种方式,不同的部署模式下,上面JobManager里面的组件的存在形式可能会有一些差异。

这部分的实现是在flink-runtime这个模块里面的,而且代码里面的命名和上面讲的大多是能直接对应上的,这部分是Flink的核心,设计也比较优雅,如果想深入源码了解Flink的内部运行机制,这个runtime模块自然是首选。

最后说一下这个Client,它虽然不是Flink运行期的一部分,但却是提交任务的窗口,凡是可以向集群提交任务的“工具”都可以称之为Client。截止目前,主要有如下Client:

  • Scala Shell(bin/start-scala-shell.sh
  • SQL Client(bin/sql-client.sh
  • CommandLine(bin/flink
  • RESTful
  • Web
  • Python Shell(bin/pyflink-shell.sh

以上内容基于目前最新的Flink 1.12.0版本。更多细节可参考:

]]>
1 http://niyanchun.com/flink-quick-learning-1-architecture.html#comments http://niyanchun.com/feed/tag/flink/
Flink快速了解(0)——开篇说明 http://niyanchun.com/flink-quick-learning-0.html http://niyanchun.com/flink-quick-learning-0.html Fri, 18 Dec 2020 22:57:00 +0800 NYC 一直想系统的写一系列Flink的文章,但由于以下原因一直搁置:

  • 凡是想写的东西网上都能找到,而且不乏有深度有质量的好文;
  • 我想写的很多东西都是基于自己对官方文档及一些高手的文章的多次阅读、实验,以及自己项目中的实践总结,所以理论上是官方文档部分内容的一些总结或者一些实战笔记;
  • 一直没有全面研究过源码,感觉不够深入,还没有达到写一系列的水平;
  • 没时间、懒、拖延症(last but not the least

最近因解决项目中的一些问题,再次好好看了下部分官方文档,觉得还是要写一下,主要有这么几个原因:

  • 看别人的终归是别人的,看的时候你以为你懂了,但让你自己系统全面的讲一遍或者写出来,才发现还是别人的,或者自己理解的还是不透彻、不全面(最主要原因)。
  • 网上的文章(包括官网)虽然多,但绝大多数存在两方面问题:要么不够系统,要么太过深入。不够系统对于解决某个具体的问题,学习某个点一般不是问题,但对于系统了解整个框架,就显得乏力了。太过深入主要指的是深入源码级的剖析,这个倒不是说不好,只是绝大多数人都是使用者,我认为没有必要一定去做源码级研究学习,因为研究源码是一个成本很高的事情,我一般推荐碰到问题必须通过源码解决的时候再去看源码,而且最好是已经对系统有个理论性的认识了。当然,并不是不建议大家看源码,挑几个优秀的框架深入学习一下源代码及其设计对于我们的提升还是非常大的。只是这个一般比较花费时间,往往跟不上实际的需求,所以不建议每学习一个新系统就去看它的源码。

基于上面两个原因,我决定还是写一下,就当是加强自己的理解和做笔记了。我会尽量保证易懂(可能会变成啰嗦...)和全面(我更喜欢叫“理解的闭环”),所以标题也想了很久,觉得就叫“快速了解”吧(看着就很肤浅是不是?)。但为了保证完整性,我会加一些我推荐的文章及参考链接,供喜欢深入研究的人学习。

好了,说这么多,就是提前给自己留个台阶,其实就是想说:写的不好,请勿喷;实在忍不住的话,请轻喷!当然,错误、意见、建议还是非常欢迎不吝赐教的!当然,系列文章也经常会中道崩殂,不要有任何意外!

]]>
0 http://niyanchun.com/flink-quick-learning-0.html#comments http://niyanchun.com/feed/tag/flink/
Flink的窗口介绍 http://niyanchun.com/flink-window-introduction.html http://niyanchun.com/flink-window-introduction.html Sat, 30 May 2020 07:51:00 +0800 NYC Flink的Watermark细节介绍一文中提到了Watermark其实主要就是解决Event Time + Window中的数据完整性问题的,本文作为那篇文章的补充,再介绍一下Window这个概念。关于这部分,我觉得官方文档已经介绍的非常详细了,如果你是Flink使用者,强烈建议好好读几遍。我这里就主要概括性的介绍一下,作为前面文章的补充,同时解决前文遗留的一个问题。

What & Why

什么是Window?为什么需要Window?流处理里面一般都是事件驱动的(Spark是微批),即每个事件来就会触发算子(Operator)进行计算,典型的比如map、flatmap、filter等,这些都是无状态的计算。有些时候需要在流处理里面进行有状态的计算,比如电商场景分析1分钟的访问人数、购买人数各是多少等,这些计算要缓存数据(即是有状态的计算),需要通过Window来做,这就是Window要解决的场景:支持流处理中有状态的计算。另外,Window本身也是一个算子,只不过是一个有状态的算子。

鉴于官网的文档非常详尽,我不重复造轮子了,我从另外一个角度来介绍一个窗口:窗口的生命周期,即从窗口的创建到最终销毁是如何流转的。这部分一些细节官方文档说的不是特别明确,我通过这篇文章做一下补充。如果你对窗口还没有任何概念,建议先阅读官方文档,对窗口有一些基础了解之后,再来阅读本文。

为了方便讲解,我把Flink的Watermark细节介绍一文中构造的例子稍微进行了一点改造,在原始事件中加入了事件类型,共有三种:pv(浏览)、cart(加购物车)、buy(购买),这样产生的原始事件就是下面这样了(注意,事件依旧是乱序的):

event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

然后我们要做的事情就是分析每5秒钟各种行为的个数。这里的数据只有12:00:00.000-12:00:05.00012:00:05.000-12:00:10.000两个5秒钟窗口的数据,我们也就用这两个窗口来分析。

Window流转

这里我先截了一张官方文档的图:

tm0c24.png

可以看到Window大的分为两类:Keyed WindowsNon-Keyed Windows。你可以把Window底层想象成一个容器,Keyed Windows就是一个Map,Non-Keyed Windows就是一个List。实际中用的最多的就是Keyed Windows了,主要有两个原因:

  1. 业务上,数据处理一般都是要哈希分区(即上面的keyBy)的。比如用户行为分析,一般会根据用户ID去哈希或者根据行为类型哈希。
  2. Non-Keyed Windows不能并行计算,也就是只能由一个线程去计算,在flink里面这个算子的parallelism只能是1。

不过,Non-Keyed Windows相比于Keyed Windows,仅是少了一个keyBy的操作,后面的流程逻辑是完全一样的,所以对于学习Window的机制,影响不是很大。而且上一篇文章中的示例其实就是Non-Keyed Windows,所以这篇文章主要以Keyed Windows为例进行介绍。

可以看到窗口基本上包含以下几个部分:

  • keyBy:即哈希,只有Keyed Windows才有,它决定如何产生Window。比如上篇文章中我们没有使用keyBy,对于tumbling windows,一个时间范围只会产生一个Window(12:00:00.000-12:00:05.000一个,12:00:05.000-12:00:10.000一个)了。如果我们使用了keyBy,那一个时间范围内有几个不同的key,就会有几个Window。
  • Window Assigner:这里主要定义Window的类型,Flink内置了一些常用的window:tumbling windowssliding windowssession windowsglobal windows,我们也可以通过继承WindowAssigner类来实现自定义的窗口。内置的几个windows都很简单,这里就不展开说了。需要注意的是global windows,它不是一个基于时间的window,后面我们会再次提到。
  • Trigger:即触发器,它的作用是决定窗口如何触发。前面说了窗口就是一个“容器”,用来缓存事件,所谓窗口触发就是什么时候开始对缓存的数据进行计算,计算规则通过下面的process/reduce/aggregate/fold/apply来定义。
  • Evictor:驱逐,待会介绍。
  • AllowedLateness:以前文章已经见过了,使用Event Time时才有意义,指定允许数据迟到的时间,默认为0,即不允许迟到。
  • SideOutputLateData:以前文章已经见过了,使用Event Time时才有意义,将迟到的数据旁路到单独的一个流。默认不旁路,迟到数据直接丢掉。
  • 窗口处理函数:process/reduce/aggregate/fold/apply,前面提到了,定义如何计算窗口内的数据。
  • GetSideOutput:用于旁路一个流,这个不是window特有的。

下面我画了一个流转图:

window-flow.png

12:00:00.000-12:00:05.000这个时间段的窗口为例,源源不断的事件流来了以后,先经过keyBy生成若干个window(图中有key=pv和key=buy两个window),然后再流经trigger,判断事件是先缓存还是触发窗口计算。如果缓存,则直接先放在窗口里面。如果是触发计算,且定义了before evictor,则把窗口缓存的所有数据交给evictor,evictor处理完之后交给定义的window function。如果window function之后还定义了after evictor,则数据再交给after evictor,最后继续发往下游。

这个流程有2个注意点:

  • window function有两大类:一类是process/apply,一类是reduce/aggregate/fold. process(apply已废弃)是数据先全部缓存在窗口中,等到触发的时候,将所有数据一次交给process方法;reduce/aggregate/fold则是来一个事件就计算一次。比如累加1-100整数的操作,process类的是先缓存,等100个数全到了,一次交给process做计算;而reduce/aggregate/fold类的则是来一个就计算一个,然后只保留计算结果。显然后者更高效一些,且不用缓存太多的数据。实践中,两类都能满足需求的时候,推荐使用后者。
  • 可以在trigger触发后,在将缓存数据交给window function之前这个之间定义一个evictor(图中的before evictor),来对数据做一些处理,也可以在window function计算完发往下游之前定义一个evictor(图中的after evictor),两个都是可选的。但需要注意的是,一旦定义了evictor,就只能使用process,不能使用reduce/aggregate/fold了,因为evictor需要一次拿到窗口内缓存的全部数据。

触发器Trigger

上面讲了,trigger可以决定窗口直接缓存数据,还是触发计算,我们看下具体是如何做到的。Flink中,Trigger抽象类定义了一些方法,其中以下几个是比较重要的:

  • onElement:每个事件来都会调用一次该方法。
  • onEventTime:当根据触发器上下文设定的Event Time的定时器触发时,就会调用该方法(有点拗口,原话是:Called when an event-time timer that was set using the trigger context fires)。比如对于12:00:00.000-12:00:05.000这个窗口,基于Event Time的默认触发器的实现是当事件时间等于12:00:05.000前1毫秒时(即12:00:04.999),定时器就会触发。后面我们会看这部分代码。
  • onProcessingTime:当根据触发器上下文设定的Processing Time的定时器时触发时,就会调用该方法。
  • onMerge:当两个窗口需要合并时,就会调用该方法。这个只有某些窗口才会使用到。
  • clear:当需要清除窗口内容的时候,就会调用该方法。

其中onElement、onEventTime、onProcessingTime决定窗口行为,它们的返回值都是一个TriggerResult,这是一个枚举类型,目前有这么几个枚举值:

  • CONTINUE:什么也不做,仅把事件加到当前窗口中。
  • FIRE:表示触发窗口,即开始在窗口上执行定义的window function。计算完之后并不会清除窗口内的数据。
  • PURGE:清除窗口内的所有数据(包含元数据),但不会触发窗口计算。这个操作相当于删除了当前窗口
  • FIRE_AND_PURGE:触发窗口计算,计算完之后删除窗口。

Trigger对于窗口的生命周期至关重要,Flink给每个内置窗口都定义了默认的触发器,我们也可以自定义自己的触发器。

文章后面我们在解答Flink的Watermark细节介绍一文中遗留的问题时会分析内置的EventTimeTrigger,到时候再结合代码看一下上面介绍的这部分内容。接下来我们再介绍一个比较重要的知识点。

迟到数据的处理

现在已经对窗口有了一个比较细致的了解,我们再来讨论下关于迟到数据的处理。默认迟到数据是会被直接丢弃的。以12:00:00.000-12:00:05.000这个窗口为例,它会在12:00:00.000-12:00:05.000这个区间的第一个事件到来时被创建,然后默认(在EventTimeTrigger中定义)在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,并在watermark值超过12:00:05.000之后将窗口移除。但如果我们通过allowedLateness设置允许1秒钟的延迟,会发生什么?

以前文章说过,允许延迟实际是延长了窗口的生命周期,允许1秒延迟相当于把窗口的生命周期变成了12:00:00.000-12:00:06.000。但上面介绍的不允许延迟的情况下的流程基本不变,依旧会按照上面的逻辑在时间范围内第一个事件到来时创建窗口,在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,但只有当watermark值超过12:00:06.000才会移除窗口。也就是如果属于12:00:00.000-12:00:05.000窗口的数据迟到了(即在12:00:05.000之前没能够来 ),但要是能够在12:00:06.000之前来,就会重新触发一次窗口计算,且来一个事件就会触发一次窗口计算。这里可以看到,允许数据迟到之后,迟到范围内到的数据依旧属于原来的窗口,而不是下一个窗口。另外就是每次触发窗口计算都会往下游发送一个数据,这样多次触发,就会往下游发送多次数据。所以允许迟到数据一方面如前文所说会影响处理的实时性,增加资源消耗,另一方面也可能会产生重复数据,需要下游能够正确处理这种情况。

最后关于窗口的生命周期需要注意的是,基于时间的窗口生命周期完结之后,Flink会负责移除,但像内置的Global Windows不属于时间窗口,Flink不会去移除这种窗口,需要用户自己实现。

窗口的个数

基于时间的窗口,一个定义的时间范围内会产生多少个窗口?答案是:1到N个,看情况(it depends~~)。看哪些情况呢:

  1. 看窗口类型。Flink内置了好几种窗口,有些窗口可能会重叠(即overlap),一个事件可能会重复分到多个窗口,也就是一个事件可能会触发创建或归属于多个窗口,典型的比如sliding windows,而Session Windows更是和gap有关系,还可能是动态gap。所以可能会产生多个窗口实例。
  2. 如果是Keyed Windows,还要看key的个数。有多个少不同的key,就会有多少个窗口。

窗口数据缓存在哪里

Flink的Window底层是State,目前State支持三种后端:

  • MemoryStateBackend:默认的后端。很简单,就是把数据缓存在内存里面,这个显然只适用于平时的开发调试或者生产上状态比较小的场景。不过现在有个FIP-50,是阿里提出的优化,提供内存不够用时再写磁盘的功能,我之前专门有篇文章分析过:Flink FLIP-50: Spill-able Heap Keyed State Backend。最初计划随着1.9版本发布,后来推迟到1.10。结果1.10也没加进去。现在1.11马上也要发布了,这个功能也没做进去,但单独提供了一个包,如果有需要的话,可以从这里下载
  • FsStateBackend:状态数据存储在TaskManager的内存里面,Checkpoint的时候会快照到文件系统上(支持HDFS等远程文件系统),适用于存储大一些的状态。
  • RocksDBStateBackend:状态数据存储在TaskManager节点的RockDB上(一个C++写的嵌入式KV数据库),Checkpoint的时候会快照到文件系统上(支持HDFS等远程文件系统)。可见整个状态都是在磁盘上存储的,所以它可以支撑超大量级的状态。

不管需用哪种状态,只需要在env那里配置一下,对于窗口以及State的使用都是透明的。DataFlow模型能做到流批统一的一个原因就是它能支撑超大量级的状态。可以看到状态数据最终可以存储在HDFS等分布式文件系统上,这样理论上MapReduce这种离线计算能搞定的,Flink都能做,且机制更加灵活。

一个完整代码示例

讲了这么多理论,看个具体的例子吧(源文件见这里),场景还是上面说的分析每5秒钟各种行为的个数:

package com.niyanchun.window;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * A window demo.
 *
 * @author NiYanchun
 **/
public class WindowDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .keyBy(f -> f.getString("action"))
        .timeWindow(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }


  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOrderedEvents().forEach(e -> {
        System.out.println(e);
        long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
        ctx.collectWithTimestamp(e, timestampInMills);
        ctx.emitWatermark(new Watermark(timestampInMills));
      });
      System.out.println();

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }


  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOrderedEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2").fluentPut("action", "cart")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6").fluentPut("action", "cart")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8").fluentPut("action", "buy")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9").fluentPut("action", "pv")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 这里把消息打乱,模拟实际中的消息乱序
    // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessWindowFunction<JSONObject, Object, String, TimeWindow> {
    @Override
    public void process(String s, Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      int count = 0;
      for (JSONObject ignored : elements) {
        count++;
      }
      System.out.println(sdf.format(window.getStart()) + "-" + sdf.format(window.getEnd()) + ": " + count + " " + s);
    }
  }

  public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private CustomEventTimeTrigger() {
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

      System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
          "; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
          "; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
      if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
      } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
      }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
          "; time:" + sdf.format(time));
      return time == window.maxTimestamp() ?
          TriggerResult.FIRE :
          TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
      ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
      // only register a timer if the watermark is not yet past the end of the merged window
      // this is in line with the logic in onElement(). If the watermark is past the end of
      // the window onElement() will fire and setting a timer here would fire the window twice.
      long windowMaxTimestamp = window.maxTimestamp();
      if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
        ctx.registerEventTimeTimer(windowMaxTimestamp);
      }
    }

    @Override
    public String toString() {
      return "EventTimeTrigger()";
    }

    /**
     * Creates an event-time trigger that fires once the watermark passes the end of the window.
     *
     * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static CustomEventTimeTrigger create() {
      return new CustomEventTimeTrigger();
    }
  }

  public static class CustomAggregation implements AggregateFunction<JSONObject, Object, Object> {

    @Override
    public Object createAccumulator() {
      return null;
    }

    @Override
    public Object add(JSONObject value, Object accumulator) {
      return null;
    }

    @Override
    public Object getResult(Object accumulator) {
      return null;
    }

    @Override
    public Object merge(Object a, Object b) {
      return null;
    }
  }
}

代码内容不说了,和上篇文章的类似,只是把原来基于Non-Keyed Windows的timeWindowAll换成了基于Keyed Windows的timeWindow,所以多了一个keyBy的操作。代码执行结果如下:

event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

2020-05-24 12:00:00-2020-05-24 12:00:05: 1 cart
2020-05-24 12:00:00-2020-05-24 12:00:05: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 buy
2020-05-24 12:00:05-2020-05-24 12:00:10: 1 cart

Process finished with exit code 0

前面是Source那里发出的乱序事件,后面是窗口计算的结果。简要分析一下:event3乱序了,被丢掉了,所以12:00:00-12:00:05这个window只收到event1、event2、event4三个时间,2个pv事件,1个加购物车事件,根据前面介绍的keyBy操作会产生2个窗口,所以打印了两个12:00:00-12:00:05,分别对应于cart和pv(最后面列就是window的key)。12:00:05-12:00:10这个时间段类似。

至此,理论篇也差不多了,这部分主要是对我认为官方文档没有太展开的一些细节部分进行了补充讲解。下一部分是以Flink内置的EventTimeTrigger触发器实现来介绍一下触发器的细节,顺便回答一下Flink的Watermark细节介绍文章中遗留的一个问题:为什么有的乱序丢了,有的没丢?

为什么有的乱序丢了,有的没丢?

Flink的Watermark细节介绍一文中我们使用的是基于Event Time的Tumbling Window,它使用的默认触发器是EventTimeTrigger,这个是Flink内置的,为了加一些打印语句,我完全Copy了一份,重新定义了一个自己的触发器CustomEventTimeTrigger,完整程序代码前面文章已经有了,这里为了方便看触发器,仅贴一下触发器部分的代码(完整代码见这里):

public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
  private static final long serialVersionUID = 1L;

  private CustomEventTimeTrigger() {
  }

  @Override
  public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
        "; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
        "; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE;
    } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
    }
  }

  @Override
  public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
        "; time:" + sdf.format(time));
    return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
        TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    ctx.deleteEventTimeTimer(window.maxTimestamp());
  }

  @Override
  public boolean canMerge() {
    return true;
  }

  @Override
  public void onMerge(TimeWindow window,
                      OnMergeContext ctx) {
    // only register a timer if the watermark is not yet past the end of the merged window
    // this is in line with the logic in onElement(). If the watermark is past the end of
    // the window onElement() will fire and setting a timer here would fire the window twice.
    long windowMaxTimestamp = window.maxTimestamp();
    if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
      ctx.registerEventTimeTimer(windowMaxTimestamp);
    }
  }

  @Override
  public String toString() {
    return "EventTimeTrigger()";
  }

  /**
   * Creates an event-time trigger that fires once the watermark passes the end of the window.
   *
   * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
   * trigger window evaluation with just this one element.
   */
  public static CustomEventTimeTrigger create() {
    return new CustomEventTimeTrigger();
  }
}

上面代码仅是在原来的EventTimeTrigger上面加了一些输出,逻辑完全没有变更。可以看到这个触发器主要实现了onElementonEventTime两个方法,也就是在使用Event Time的时候,窗口触发主要通过这两个方法实现。我们先看下程序输出:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

onElement -- event: event1; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():292269055-12-03 00:47:04.192
onElement -- event: event2; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:00.000
onElement -- event: event4; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:01.000
onElement -- event: event5; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:04.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:04.999; time:2020-05-24 12:00:04.999

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3
onElement -- event: event7; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:05.000
onElement -- event: event6; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event8; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event9; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:08.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:09.999; time:2020-05-24 12:00:09.999

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

这里解释几个时间:

  • 之前说过一个时间窗口的窗口范围是[start_time, end_time),时间格式都是从1970-01-01T00:00:00Z至今的毫秒数。而这里的window.maxTimestamp()就是这个end_time减1(因为是左闭右开的).
  • onElement方法参数中的long timestamp是指当前事件(即T element)到达的时间。
  • onEventTime方法参数中的long time就是前文说的定时器触发的时间,实际值和window.maxTimestamp()一样,也就是这两个值是时间窗口生命周期结束的时间值。

根据上面的输出也可以看出来,然后结合这里代码实现就得出了前文给出的结论:对于窗口而言,它只看属于这个窗口的数据是否在[start_time, end_time)这个时间范围内来,如果来了,就没迟到,就是“有序的”,所以它保证的是当前时间范围窗口和下一个时间范围窗口的有序,而不是一个时间范围窗口内时间的有序性,而且Flink的确并不保证一个窗口内的事件顺序(原文是:Flink provides no guarantees about the order of the elements within a window.)。也就是一个窗口内event1是比event2早到,但传给窗口计算函数(process那种)的时候,并不保证event1一定在event2之前。

本文到这里也就结束了,只有理解了窗口的机制,才能更好的理解watermark是为了解决什么问题。但请记住,只有当使用基于Event Time的窗口时,Watermark才有意义。

]]>
0 http://niyanchun.com/flink-window-introduction.html#comments http://niyanchun.com/feed/tag/flink/