上篇介绍了常见的算子,本文介绍另外一个重要的算子:Async I/O,即异步IO。它是流中频繁访问外部数据的利器,特别是当访问比较耗时的时候。

产生背景

先考虑一个实际中挺常见的场景:一个流处理程序中对于每个事件都要查一次外部的维表(比如HBase,这里暂不考虑缓存机制)做关联,那在Flink中如何实现呢?典型的做法就是增加一个map/flatmap,在里面做一下查询关联。这样功能没问题,但这个查询很容易会变成系统的瓶颈,特别是当外部查询比较耗时的时候。好在Flink里面有一个异步IO算子,可以很好的解决这个问题。

异步IO是阿里巴巴贡献给Flink非常重要的一个特性,在Flink 1.2版本中正式发布,对应的提案是FLIP-12: Asynchronous I/O Design and Implementation。这个特性解决的问题和所有其它的异步IO、IO多路复用技术是一致的:IO往往比较耗时,通过异步IO可以提高系统的吞吐量。这个Async I/O特性只不过是流处理场景里面的异步IO而已,原理没有什么特殊之处,看下官方的一个图:

async-io

左侧是同步IO,可以看到大部分时间用来等待了;右侧是异步IO,提升效果很明显。当然,通过提高任务的并行度也能一定程度的缓解同步IO的问题,这种方式有点类似于网络编程早期的per-connection-per-thread模型,但这种模式不够彻底,而且提高并行度的代价比较高。道理都懂,就不再赘述了,下面看怎么用。

如何使用

使用概述

回想一下网络编程中的异步IO(这里指的是IO多路复用技术),必须要内核支持select、poll/epoll才可以。Flink的异步IO也类似,需要访问外部数据的客户端支持异步请求才可以。如果不支持的话,也可以通过线程池技术模拟异步请求,当然效果上会差一些,但一般还是比同步IO强的。具体到编码层面,分3个步骤:

  1. 实现AsyncFunction接口,这个接口的作用是分发请求。Flink内置了一个实现类RichAsyncFunction,一般我们继承这个类即可。
  2. AsyncFunction#asyncInvoke(...)中实现一个回调函数,在回调函数中获取异步执行的结果,并且传递给ResultFuture
  3. 将异步操作应用到某个流上面。

下面是官方给的一段示例代码:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 * 第1步:实现`AsyncFunction`接口
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 第2步:实现一个回调函数
        CompletableFuture.supplyAsync(new Supplier<String>() {
            
            @Override
            public String get() {
                try {
                    // 获取异步执行的结果
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            // 并且传递给`ResultFuture`
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
// 第3步:将异步操作应用到某个流上面。
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

代码的执行逻辑是这样的:

  • 流里面的每一个事件都会调用asyncInvoke,然后我们在这个方法里面实现访问外部数据的异步请求,比如上面代码中的client.query(key),然后得到一个异步对象(比如Future对象)。
  • 给这个异步对象定义一个回调函数,比如上面代码中的CompletableFuture.supplyAsync(...).thenAccept(...),然后在该回调函数获取异步任务的执行结果,并且交给ResultFuture,这一步非常重要,待会还会再提到。

为了防止异步IO无限堆积或者某些请求挂死,一般都会提供超时设置和最高的异步并发数,Flink的异步IO也不例外。上面代码中最后一行的unorderedWait参数中的1000就是异步任务的超时时间,100就是同时允许的并发请求数,称为Capacity。那超时后如何处理呢?看下AsyncFunction接口:

public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     *     trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
     * exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
    }
}

除了定义了一个asyncInvoke方法,还定义了一个timeout,并且提供了一个默认实现。当异步任务超时后,就会回调该方法,默认是抛出异常,即整个任务失败。一般我们需要重新实现该方法。那如果并发数达到了限制会怎么样?答案是会阻塞,产生反压。

另外需要注意的一个地方就是事件的有序性。因为是异步请求,所以返回时间是不确定的,Flink的异步IO既支持保证事件的原始顺序,也可以不保证。异步IO的内部实现是依靠队列的,当需要保证有序的时候,实际是将先得到结果但排在后面的事件缓存起来,所以必然会增加一些延迟和资源占用。而无序可以获得更好的实时性和吞吐,但需要注意的是当使用Event Time的时候,这个无序并不是全局无序,而是在两个watermark之间的事件无序,整体还是要遵从watermark机制的。无序使用AsyncDataStream.unorderedWait(...) ,有序使用AsyncDataStream.orderedWait(...)

同步模式&线程池模式&异步模式对比demo

这部分主要是给了两个示例,演示如何使用Flink异步IO。

第一个例子中包含了3种场景:

  • flatmap中实现外部访问的同步IO
  • 使用线程池实现的异步IO,且不保证顺序
  • 使用线程池实现的异步IO,且保证顺序

其中,为了体现异步IO的优势,并没有真正访问数据库,而是使用了一个sleep操作,模拟比较耗时的IO操作。

public void update(Tuple2<String, Integer> tuple2) throws SQLException {
    // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
    //ps.setLong(1, balance);
    //ps.setLong(2, 1);
    //ps.execute();
    try {
    Thread.sleep(tuple2.f1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
}

注意:虽然没有真正访问数据库,但整个代码都是按照模拟真实场景写的,只是把里面执行数据库操作的换成了sleep,所以运行时还是会连接数据库、创建连接池。如果要自己运行代码,请修改代码中的数据库连接地址。另外,3个场景使用的数据源是同一个,而sleep的时间也是在数据源中定义好的,所以它们的IO耗时是相同的:

static List<Tuple2<String, Integer>> dataset() {
List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
    // f0: 元素名称  f1: sleep的时间,模拟该元素需要的IO耗时
    dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
}
return dataset;
}

下面是完整代码:

public class ThreadPoolAsyncDemoFakeQuery {

  public static void main(String[] args) throws Exception {
    // --------------------------------- Async IO + Unordered -------------------------
    StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncUnordered.setParallelism(1);
    envAsyncUnordered.setBufferTimeout(0);

    DataStream<Tuple2<String, Integer>> source = envAsyncUnordered.fromCollection(dataset());
    DataStream<String> result = AsyncDataStream.unorderedWait(source, new ThreadPoolAsyncMysqlRequest(),
        10, TimeUnit.SECONDS, 10);
    result.print();

    System.out.println("Async + UnorderedWait:");
    envAsyncUnordered.execute("unorderedWait");

    // --------------------------------- Async IO + Ordered -------------------------
    StreamExecutionEnvironment envAsyncOrdered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncOrdered.setParallelism(1);
    envAsyncOrdered.setBufferTimeout(0);

    AsyncDataStream.orderedWait(envAsyncOrdered.fromCollection(dataset()),
        new ThreadPoolAsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10)
        .print();
    System.out.println("Async + OrderedWait");
    envAsyncOrdered.execute("orderedWait");

    // --------------------------------- Sync IO -------------------------
    StreamExecutionEnvironment envSync = StreamExecutionEnvironment.getExecutionEnvironment();
    envSync.setParallelism(1);
    envSync.setBufferTimeout(0);

    envSync.fromCollection(dataset())
        .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
          private transient MysqlClient client;

          @Override
          public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            client = new MysqlClient();
          }

          @Override
          public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            try {
              client.update(value);
              out.collect(value + ": " + System.currentTimeMillis());
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        }).print();
    System.out.println("Sync IO:");
    envSync.execute("Sync IO");
  }

  static List<Tuple2<String, Integer>> dataset() {
    List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
    for (int i = 0; i < 10; i++) {
      // f0: 元素名称  f1: sleep的时间
      dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
    }
    return dataset;
  }

  static class ThreadPoolAsyncMysqlRequest extends RichAsyncFunction<Tuple2<String, Integer>, String> {
    private transient ExecutorService executor;
    private transient MysqlClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      executor = Executors.newFixedThreadPool(30);
      client = new MysqlClient();
    }

    @Override
    public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) {
      executor.submit(() -> {
        long current = System.currentTimeMillis();
        String output = input + ":" + current;

        try {
          client.update(input);
          resultFuture.complete(Collections.singleton(output));
        } catch (SQLException e) {
          e.printStackTrace();
          resultFuture.complete(Collections.singleton(input + ": " + e.getMessage()));
        }
      });
    }

    @Override
    public void timeout(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception {
      System.out.printf("%s timeout\n", input);
      resultFuture.complete(Collections.singleton(input + ": time out"));
    }

    @Override
    public void close() throws Exception {
      client.close();
      super.close();
    }
  }

  static class MysqlClient {
    static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    static final String DB_URL = "jdbc:mysql://10.9.1.18:3306/test?useSSL=false";

    private Connection conn;
    private PreparedStatement ps;

    public MysqlClient() throws Exception {
      Class.forName(JDBC_DRIVER);
      conn = DriverManager.getConnection(DB_URL, "root", "root123.");
      ps = conn.prepareStatement("UPDATE account SET balance = ? WHERE id = ?;");
    }

    public void update(Tuple2<String, Integer> tuple2) throws SQLException {
      // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
      //ps.setLong(1, balance);
      //ps.setLong(2, 1);
      //ps.execute();
      try {
        Thread.sleep(tuple2.f1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    public void close() {
      try {
        if (conn != null) {
          conn.close();
        }
        if (ps != null) {
          ps.close();
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

代码输出如下:

Async + UnorderedWait:
(e0,1):1617109474293
(e3,1):1617109474294
(e1,2):1617109474293
(e4,2):1617109474294
(e2,3):1617109474293
(e6,1):1617109474295
(e9,1):1617109474297
(e7,2):1617109474296
(e5,3):1617109474295
(e8,3):1617109474297
Async + OrderedWait
(e0,1):1617109474891
(e1,2):1617109474891
(e2,3):1617109474891
(e3,1):1617109474891
(e4,2):1617109474892
(e5,3):1617109474892
(e6,1):1617109474893
(e7,2):1617109474893
(e8,3):1617109474893
(e9,1):1617109474893
Sync IO:
(e0,1): 1617109475257
(e1,2): 1617109475260
(e2,3): 1617109475264
(e3,1): 1617109475265
(e4,2): 1617109475268
(e5,3): 1617109475272
(e6,1): 1617109475274
(e7,2): 1617109475277
(e8,3): 1617109475281
(e9,1): 1617109475283

可以看到:

  • Async + UnorderedWait:耗时4ms,且输出的事件是乱序的
  • Async + OrderedWait:耗时2ms,输出事件是保持原有顺序的
  • Sync IO:耗时26ms,输出的事件也是保持原有顺序的

该程序运行多次,输出不太稳定,但Async方式都是远远小于Sync的。因为数据量比较小,而且耗时都比较平均,所以无序的优势不明显,有时甚至还会比有序高。这个例子并不是一个严格的性能测试,但却可以用来体现Async相比于Sync的明显优势。

第二个例子则是使用真实的异步请求客户端执行真正的操作。这里使用了jasync-sql用于异步访问MySQL。完整代码如下:

public class AsyncDemo {

  public static void main(String[] args) throws Exception {
    final long repeat = 100L;

    // --------------------------------- Async IO + Unordered -------------------------
    StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
    envAsyncUnordered.setParallelism(1);
    envAsyncUnordered.setBufferTimeout(0);

    DataStream<Long> source = envAsyncUnordered.fromSequence(1, repeat);
    DataStream<String> result = AsyncDataStream.unorderedWait(source, new AsyncMysqlRequest(),
        10, TimeUnit.SECONDS, 10);
    result.print();

    System.out.println("Async + UnorderedWait:");
    envAsyncUnordered.execute("unorderedWait");
  }


  static class AsyncMysqlRequest extends RichAsyncFunction<Long, String> {
    Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      conn = MySQLConnectionBuilder.createConnectionPool("jdbc:mysql://{your-ip}:3306/test?user=root&password={your-password}");
      conn.connect().get();
    }

    @Override
    public void asyncInvoke(Long input, ResultFuture<String> resultFuture) throws Exception {
      List<Long> balance = new ArrayList<>(1);
      balance.add(input);
      CompletableFuture<QueryResult> future =
          conn.sendPreparedStatement("SELECT * FROM account WHERE balance = ?;", balance);
      // 如果执行成功
      future.thenAccept(queryResult -> {
        resultFuture.complete(Collections.singleton(balance + ":" + System.currentTimeMillis()));
      });
      // 如果执行异常
      future.exceptionally(e -> {
        e.printStackTrace();
        resultFuture.complete(Collections.singleton(balance + e.getMessage() + ":" + System.currentTimeMillis()));
        return null;
      });
    }

    @Override
    public void timeout(Long input, ResultFuture<String> resultFuture) throws Exception {
      System.out.printf("%d timeout\n", input);
      resultFuture.complete(Collections.singleton(input + ": time out"));
    }
  }
}

使用注意点

  1. 不要在AsyncFunction#asyncInvoke(...)内部执行比较耗时的操作,比如同步等待异步请求的结果(应该放到回调中执行)。因为一个流中每个Partition只有一个AsyncFunction实例,一个实例里面的数据是顺序调用asyncInvoke的,如果在里面执行耗时操作,那异步效果将大打折扣,如果同步等待异步的结果,那其实就退化成同步IO了。
  2. 异步请求超时回调默认是抛出异常,这样会导致整个Flink Job退出。这一般不是我们想要的,所以大多数时候都需要覆写timeout方法。
  3. 在自定义的回调函数里面一定要使用ResultFuture#completeResultFuture#completeExceptionally将执行结果传递给ResultFuture,否则异步请求会一直堆积在队列里面。当队列满了以后,整个任务流就卡主了。
  4. Flink异步IO也是支持Checkpoint的,所以故障后可以恢复,提供Exactly-Once语义保证。

原理浅析

AsyncWaitOperator

这里结合目前最新的Flink 1.12.1版本从源码角度简单分析一下Async I/O的实现,这里引用了Jark博客中的几张图(见文末引用部分)。

async-internal

如图,Flink Async I/O实质是通过队列的方式实现了一个异步的生产者-消费者模型。其中队列就是StreamElementQueue;生产者就是就是我们的主流,通过AsyncDataStream.xxxWait将数据放入队列;消费者就是图中的Emitter。调用流程就是流中的一个事件到了以后,先放入队列中,同时调用通过AsyncFunction#asyncInvoke(...)定义的异步逻辑。当异步逻辑执行完成后,就会将队列中该事件标记为已完成,然后Emitter将该事件的结果发送到下游继续执行。实现Async I/O最主要的类是AsyncWaitOperator,我把代码做了精简,保留了体现上面步骤的部分代码,具体见代码中的注释。

@Internal
public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

    /** Capacity of the stream element queue. */
    private final int capacity;

    /** Output mode for this operator. */
    private final AsyncDataStream.OutputMode outputMode;

    /** Timeout for the async collectors. */
    private final long timeout;

    /** Queue, into which to store the currently in-flight stream elements. */
    // 队列
    private transient StreamElementQueue<OUT> queue;

  
    @Override
    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue<>(capacity);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue<>(capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }

        this.timestampedCollector = new TimestampedCollector<>(output);
    }


    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // add element first to the queue
        // 将事件放入队列
        final ResultFuture<OUT> entry = addToWorkQueue(element);

        final ResultHandler resultHandler = new ResultHandler(element, entry);

        // register a timeout for the entry if timeout is configured
        // 设置异步回调的超时处理
        if (timeout > 0L) {
            final long timeoutTimestamp =
                    timeout + getProcessingTimeService().getCurrentProcessingTime();

            final ScheduledFuture<?> timeoutTimer =
                    getProcessingTimeService()
                            .registerTimer(
                                    timeoutTimestamp,
                                    timestamp ->
                                            userFunction.timeout(
                                                    element.getValue(), resultHandler));

            resultHandler.setTimeoutTimer(timeoutTimer);
        }

        // 调用异步逻辑
        userFunction.asyncInvoke(element.getValue(), resultHandler);
    }


    /**
     * Add the given stream element to the operator's stream element queue. This operation blocks
     * until the element has been added.
     *
     * <p>Between two insertion attempts, this method yields the execution to the mailbox, such that
     * events as well as asynchronous results can be processed.
     *
     * @param streamElement to add to the operator's queue
     * @throws InterruptedException if the current thread has been interrupted while yielding to
     *     mailbox
     * @return a handle that allows to set the result of the async computation for the given
     *     element.
     */
    private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
            throws InterruptedException {

        Optional<ResultFuture<OUT>> queueEntry;
        while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
            mailboxExecutor.yield();
        }

        return queueEntry.get();
    }


    /**
     * Outputs one completed element. Watermarks are always completed if it's their turn to be
     * processed.
     *
     * <p>This method will be called from {@link #processWatermark(Watermark)} and from a mail
     * processing the result of an async function call.
     */
    private void outputCompletedElement() {
        if (queue.hasCompletedElements()) {
            // emit only one element to not block the mailbox thread unnecessarily
            queue.emitCompletedElement(timestampedCollector);
        }
    }

}

可以看到processElement方法中先将事件放入队列,然后注册超时定时器,最后再调用异步处理逻辑。其中addToWorkQueue中使用了一个while循环往队列里面放入事件,也就是如果队列满了,这个插入操作会一直阻塞在这里。也就是前文提到的:如果并发的异步请求数达到了Capacity的值,流就会阻塞产生反压。然后,在outputCompletedElement方法中实现了将完成的异步请求事件发送到下游的操作。

前文提到了Flink Async I/O提供了有序和无序两种保证,这部分功能是通过不同的队列实现的。上面的代码中也能看到有序的时候使用的是OrderedStreamElementQueue,无序的时候使用的是UnorderedStreamElementQueue,队列的大小就是最大并发的异步请求数Capacity。这两个队列都是图中StreamElementQueue接口的具体实现,先看下这个接口的定义:

/** Interface for stream element queues for the {@link AsyncWaitOperator}. */
@Internal
public interface StreamElementQueue<OUT> {

    /**
     * Tries to put the given element in the queue. This operation succeeds if the queue has
     * capacity left and fails if the queue is full.
     *
     * <p>This method returns a handle to the inserted element that allows to set the result of the
     * computation.
     *
     * @param streamElement the element to be inserted.
     * @return A handle to the element if successful or {@link Optional#empty()} otherwise.
     */
    Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);

    /**
     * Emits one completed element from the head of this queue into the given output.
     *
     * <p>Will not emit any element if no element has been completed (check {@link
     * #hasCompletedElements()} before entering any critical section).
     *
     * @param output the output into which to emit
     */
    void emitCompletedElement(TimestampedCollector<OUT> output);

    /**
     * Checks if there is at least one completed head element.
     *
     * @return True if there is a completed head element.
     */
    boolean hasCompletedElements();

    /**
     * Returns the collection of {@link StreamElement} currently contained in this queue for
     * checkpointing.
     *
     * <p>This includes all non-emitted, completed and non-completed elements.
     *
     * @return List of currently contained {@link StreamElement}.
     */
    List<StreamElement> values();

    /**
     * True if the queue is empty; otherwise false.
     *
     * @return True if the queue is empty; otherwise false.
     */
    boolean isEmpty();

    /**
     * Return the size of the queue.
     *
     * @return The number of elements contained in this queue.
     */
    int size();
}

接口定义了5个方法:

  • tryPut:向队列中插入数据;
  • emitCompletedElement:向下游发送已经完成的异步请求,即图上中的Emitter;
  • hasCompletedElements:判断是否有已经完成的异步请求
  • values:当前队列中的事件
  • isEmpty:队列是否为空
  • size:队列大小

下面分别看下有序和无序队列是如何实现这个接口的。

有序队列OrderedStreamElementQueue

有序实现比较简单,先进先出就行了。所以OrderedStreamElementQueue的代码量不多,我就全贴这里了:

@Internal
public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final Queue<StreamElementQueueEntry<OUT>> queue;

    public OrderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (hasCompletedElements()) {
            final StreamElementQueueEntry<OUT> head = queue.poll();
            head.emitResult(output);
        }
    }

    @Override
    public List<StreamElement> values() {
        List<StreamElement> list = new ArrayList<>(this.queue.size());
        for (StreamElementQueueEntry e : queue) {
            list.add(e.getInputElement());
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (queue.size() < capacity) {
            StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

            queue.add(queueEntry);

            LOG.debug(
                    "Put element into ordered stream element queue. New filling degree "
                            + "({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into ordered stream element queue because it "
                            + "was full ({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
        if (streamElement.isRecord()) {
            return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
        }
        if (streamElement.isWatermark()) {
            return new WatermarkQueueEntry<>((Watermark) streamElement);
        }
        throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
    }
}

里面就使用了一个ArrayDeque实现了队列,事件按顺序进入队列,出队列调用的是queue.poll(),所以顺序自然是保证的。在出队列的时候会先判断是否有已经完成的异步请求,即hasCompletedElements,看下它的实现:

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

代码里面调用queue中头部元素(peek)的isDone方法。队列里面的元素是StreamElementQueueEntry接口类型,该接口继承自ResultFuture接口,且对应的实现类是StreamRecordQueueEntry,我们看下里面相关的实现代码:

class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {
    @Nonnull private final StreamRecord<?> inputRecord;

    private Collection<OUT> completedElements;

    StreamRecordQueueEntry(StreamRecord<?> inputRecord) {
        this.inputRecord = Preconditions.checkNotNull(inputRecord);
    }

    @Override
    public boolean isDone() {
        return completedElements != null;
    }

    @Nonnull
    @Override
    public StreamRecord<?> getInputElement() {
        return inputRecord;
    }

    @Override
    public void emitResult(TimestampedCollector<OUT> output) {
        output.setTimestamp(inputRecord);
        for (OUT r : completedElements) {
            output.collect(r);
        }
    }

    @Override
    public void complete(Collection<OUT> result) {
        this.completedElements = Preconditions.checkNotNull(result);
    }
}

可以看到isDone方法的逻辑是判断completedElements是否为null,而completedElements的赋值就是在complete方法中的。这就是为什么我们必须在AsyncFunction#asyncInvoke(...)中定义的回调函数中调用ResultFuture#complete的原因。如果不这样,队列中的异步事件就无法被标记为已完成,当队列满了以后,整个系统的数据流就卡主了。当然异常时调用ResultFuture#completeExceptionally也是OK的,这个在AsyncWaitOperator.ResultHandler类中也有调用。

最后附一张来自Jark博客的图:

ordered

无序队列UnorderedStreamElementQueue

无序队列队列相对复杂一些。如果是使用Processing Time,那就是全局无序,也比较简单;但当使用Event Time的时候,如前文所述,就不是全局无序了,在相邻的两个watermark产生的区间内(代码实现中称为一个Segment)事件可以无序;但不同的区间还是要有序,即整体还是必须遵从Watermark的限制。所以为了实现这种机制,无序队列引入了Segment,一个Segment表示相邻watermark产生的一个区间。也就是Segment内无序,Segment之间有序。我们看下代码实现。

@Internal
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

    private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue of queue entries segmented by watermarks. */
    private final Deque<Segment<OUT>> segments;

    private int numberOfEntries;

    public UnorderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        // most likely scenario are 4 segments <elements, watermark, elements, watermark>
        this.segments = new ArrayDeque<>(4);
        this.numberOfEntries = 0;
    }

    @Override
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (size() < capacity) {
            StreamElementQueueEntry<OUT> queueEntry;
            if (streamElement.isRecord()) {
                queueEntry = addRecord((StreamRecord<?>) streamElement);
            } else if (streamElement.isWatermark()) {
                queueEntry = addWatermark((Watermark) streamElement);
            } else {
                throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
            }

            numberOfEntries++;

            LOG.debug(
                    "Put element into unordered stream element queue. New filling degree "
                            + "({}/{}).",
                    size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into unordered stream element queue because it "
                            + "was full ({}/{}).",
                    size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
        // ensure that there is at least one segment
        Segment<OUT> lastSegment;
        if (segments.isEmpty()) {
            lastSegment = addSegment(capacity);
        } else {
            lastSegment = segments.getLast();
        }

        // entry is bound to segment to notify it easily upon completion
        StreamElementQueueEntry<OUT> queueEntry =
                new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
        lastSegment.add(queueEntry);
        return queueEntry;
    }

    private Segment<OUT> addSegment(int capacity) {
        Segment newSegment = new Segment(capacity);
        segments.addLast(newSegment);
        return newSegment;
    }

    private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
        Segment<OUT> watermarkSegment;
        if (!segments.isEmpty() && segments.getLast().isEmpty()) {
            // reuse already existing segment if possible (completely drained) or the new segment
            // added at the end of
            // this method for two succeeding watermarks
            watermarkSegment = segments.getLast();
        } else {
            watermarkSegment = addSegment(1);
        }

        StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
        watermarkSegment.add(watermarkEntry);

        // add a new segment for actual elements
        addSegment(capacity);
        return watermarkEntry;
    }

    @Override
    public boolean hasCompletedElements() {
        return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (segments.isEmpty()) {
            return;
        }
        final Segment currentSegment = segments.getFirst();
        numberOfEntries -= currentSegment.emitCompleted(output);

        // remove any segment if there are further segments, if not leave it as an optimization even
        // if empty
        if (segments.size() > 1 && currentSegment.isEmpty()) {
            segments.pop();
        }
    }

    @Override
    public List<StreamElement> values() {
        List<StreamElement> list = new ArrayList<>();
        for (Segment s : segments) {
            s.addPendingElements(list);
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return numberOfEntries == 0;
    }

    @Override
    public int size() {
        return numberOfEntries;
    }

    /** An entry that notifies the respective segment upon completion. */
    static class SegmentedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> {
        private final Segment<OUT> segment;

        SegmentedStreamRecordQueueEntry(StreamRecord<?> inputRecord, Segment<OUT> segment) {
            super(inputRecord);
            this.segment = segment;
        }

        @Override
        public void complete(Collection<OUT> result) {
            super.complete(result);
            segment.completed(this);
        }
    }

    /**
     * A segment is a collection of queue entries that can be completed in arbitrary order.
     *
     * <p>All elements from one segment must be emitted before any element of the next segment is
     * emitted.
     */
    static class Segment<OUT> {
        /** Unfinished input elements. */
        private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

        /** Undrained finished elements. */
        private final Queue<StreamElementQueueEntry<OUT>> completedElements;

        Segment(int initialCapacity) {
            incompleteElements = new HashSet<>(initialCapacity);
            completedElements = new ArrayDeque<>(initialCapacity);
        }

        /** Signals that an entry finished computation. */
        void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
            // adding only to completed queue if not completed before
            // there may be a real result coming after a timeout result, which is updated in the
            // queue entry but
            // the entry is not re-added to the complete queue
            if (incompleteElements.remove(elementQueueEntry)) {
                completedElements.add(elementQueueEntry);
            }
        }

        /**
         * True if there are no incomplete elements and all complete elements have been consumed.
         */
        boolean isEmpty() {
            return incompleteElements.isEmpty() && completedElements.isEmpty();
        }

        /**
         * True if there is at least one completed elements, such that {@link
         * #emitCompleted(TimestampedCollector)} will actually output an element.
         */
        boolean hasCompleted() {
            return !completedElements.isEmpty();
        }

        /**
         * Adds the segmentd input elements for checkpointing including completed but not yet
         * emitted elements.
         */
        void addPendingElements(List<StreamElement> results) {
            for (StreamElementQueueEntry<OUT> element : completedElements) {
                results.add(element.getInputElement());
            }
            for (StreamElementQueueEntry<OUT> element : incompleteElements) {
                results.add(element.getInputElement());
            }
        }

        /**
         * Pops one completed elements into the given output. Because an input element may produce
         * an arbitrary number of output elements, there is no correlation between the size of the
         * collection and the popped elements.
         *
         * @return the number of popped input elements.
         */
        int emitCompleted(TimestampedCollector<OUT> output) {
            final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
            if (completedEntry == null) {
                return 0;
            }
            completedEntry.emitResult(output);
            return 1;
        }

        /**
         * Adds the given entry to this segment. If the element is completed (watermark), it is
         * directly moved into the completed queue.
         */
        void add(StreamElementQueueEntry<OUT> queueEntry) {
            if (queueEntry.isDone()) {
                completedElements.add(queueEntry);
            } else {
                incompleteElements.add(queueEntry);
            }
        }
    }
}

队列部分的代码如下:

private final Deque<Segment<OUT>> segments;
// most likely scenario are 4 segments <elements, watermark, elements, watermark>
this.segments = new ArrayDeque<>(4);


/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;

Segment(int initialCapacity) {
    incompleteElements = new HashSet<>(initialCapacity);
    completedElements = new ArrayDeque<>(initialCapacity);
}

可以看到整体还是一个ArrayDeque,但队列内部是一个Segment;Segment内部又分成了2个队列:未完成请求队列incompleteElements和已完成请求的队列completedElements。Segment内部是无序的,所以incompleteElements使用的数据类型是Set。当某个请求完成后,就转移到completedElements中。此时,如果来的是普通事件,则调用addRecord将事件放入对应Segment的incompleteElements队列中;如果来的是watermark,则调用addSegment创建一个新的Segment。如果使用Processing Time,那就永远没有watermark,也就是全局就是一个大的Segment,也就是全局无序。

最后附一张Jark博客的图:

unordered

总结

Flink Asycn I/O是阿里结合自身实践场景贡献给社区的一个特性,所以自然是有很多实际需求的。流中关联维表的操作在具体业务中也的确很常见,异步IO就是应对这些场景的利剑。在具体使用时要特别注意本文“使用注意点”一节的几个点。

References: