先考虑一个实际中挺常见的场景:一个流处理程序中对于每个事件都要查一次外部的维表(比如HBase,这里暂不考虑缓存机制)做关联,那在Flink中如何实现呢?典型的做法就是增加一个map/flatmap
,在里面做一下查询关联。这样功能没问题,但这个查询很容易会变成系统的瓶颈,特别是当外部查询比较耗时的时候。好在Flink里面有一个异步IO算子,可以很好的解决这个问题。
异步IO是阿里巴巴贡献给Flink非常重要的一个特性,在Flink 1.2版本中正式发布,对应的提案是FLIP-12: Asynchronous I/O Design and Implementation。这个特性解决的问题和所有其它的异步IO、IO多路复用技术是一致的:IO往往比较耗时,通过异步IO可以提高系统的吞吐量。这个Async I/O特性只不过是流处理场景里面的异步IO而已,原理没有什么特殊之处,看下官方的一个图:
左侧是同步IO,可以看到大部分时间用来等待了;右侧是异步IO,提升效果很明显。当然,通过提高任务的并行度也能一定程度的缓解同步IO的问题,这种方式有点类似于网络编程早期的per-connection-per-thread模型,但这种模式不够彻底,而且提高并行度的代价比较高。道理都懂,就不再赘述了,下面看怎么用。
回想一下网络编程中的异步IO(这里指的是IO多路复用技术),必须要内核支持select、poll/epoll才可以。Flink的异步IO也类似,需要访问外部数据的客户端支持异步请求才可以。如果不支持的话,也可以通过线程池技术模拟异步请求,当然效果上会差一些,但一般还是比同步IO强的。具体到编码层面,分3个步骤:
AsyncFunction
接口,这个接口的作用是分发请求。Flink内置了一个实现类RichAsyncFunction
,一般我们继承这个类即可。AsyncFunction#asyncInvoke(...)
中实现一个回调函数,在回调函数中获取异步执行的结果,并且传递给ResultFuture
。下面是官方给的一段示例代码:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
* 第1步:实现`AsyncFunction`接口
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 第2步:实现一个回调函数
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
// 获取异步执行的结果
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
// 并且传递给`ResultFuture`
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
// 第3步:将异步操作应用到某个流上面。
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
代码的执行逻辑是这样的:
asyncInvoke
,然后我们在这个方法里面实现访问外部数据的异步请求,比如上面代码中的client.query(key)
,然后得到一个异步对象(比如Future对象)。CompletableFuture.supplyAsync(...).thenAccept(...)
,然后在该回调函数获取异步任务的执行结果,并且交给ResultFuture
,这一步非常重要,待会还会再提到。为了防止异步IO无限堆积或者某些请求挂死,一般都会提供超时设置和最高的异步并发数,Flink的异步IO也不例外。上面代码中最后一行的unorderedWait
参数中的1000就是异步任务的超时时间,100就是同时允许的并发请求数,称为Capacity。那超时后如何处理呢?看下AsyncFunction
接口:
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is
* exceptionally completed with a timeout exception.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
}
除了定义了一个asyncInvoke
方法,还定义了一个timeout
,并且提供了一个默认实现。当异步任务超时后,就会回调该方法,默认是抛出异常,即整个任务失败。一般我们需要重新实现该方法。那如果并发数达到了限制会怎么样?答案是会阻塞,产生反压。
另外需要注意的一个地方就是事件的有序性。因为是异步请求,所以返回时间是不确定的,Flink的异步IO既支持保证事件的原始顺序,也可以不保证。异步IO的内部实现是依靠队列的,当需要保证有序的时候,实际是将先得到结果但排在后面的事件缓存起来,所以必然会增加一些延迟和资源占用。而无序可以获得更好的实时性和吞吐,但需要注意的是当使用Event Time的时候,这个无序并不是全局无序,而是在两个watermark之间的事件无序,整体还是要遵从watermark机制的。无序使用AsyncDataStream.unorderedWait(...)
,有序使用AsyncDataStream.orderedWait(...)
。
这部分主要是给了两个示例,演示如何使用Flink异步IO。
第一个例子中包含了3种场景:
flatmap
中实现外部访问的同步IO其中,为了体现异步IO的优势,并没有真正访问数据库,而是使用了一个sleep操作,模拟比较耗时的IO操作。
public void update(Tuple2<String, Integer> tuple2) throws SQLException {
// 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
//ps.setLong(1, balance);
//ps.setLong(2, 1);
//ps.execute();
try {
Thread.sleep(tuple2.f1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
注意:虽然没有真正访问数据库,但整个代码都是按照模拟真实场景写的,只是把里面执行数据库操作的换成了sleep,所以运行时还是会连接数据库、创建连接池。如果要自己运行代码,请修改代码中的数据库连接地址。另外,3个场景使用的数据源是同一个,而sleep的时间也是在数据源中定义好的,所以它们的IO耗时是相同的:
static List<Tuple2<String, Integer>> dataset() {
List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
// f0: 元素名称 f1: sleep的时间,模拟该元素需要的IO耗时
dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
}
return dataset;
}
下面是完整代码:
public class ThreadPoolAsyncDemoFakeQuery {
public static void main(String[] args) throws Exception {
// --------------------------------- Async IO + Unordered -------------------------
StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
envAsyncUnordered.setParallelism(1);
envAsyncUnordered.setBufferTimeout(0);
DataStream<Tuple2<String, Integer>> source = envAsyncUnordered.fromCollection(dataset());
DataStream<String> result = AsyncDataStream.unorderedWait(source, new ThreadPoolAsyncMysqlRequest(),
10, TimeUnit.SECONDS, 10);
result.print();
System.out.println("Async + UnorderedWait:");
envAsyncUnordered.execute("unorderedWait");
// --------------------------------- Async IO + Ordered -------------------------
StreamExecutionEnvironment envAsyncOrdered = StreamExecutionEnvironment.getExecutionEnvironment();
envAsyncOrdered.setParallelism(1);
envAsyncOrdered.setBufferTimeout(0);
AsyncDataStream.orderedWait(envAsyncOrdered.fromCollection(dataset()),
new ThreadPoolAsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10)
.print();
System.out.println("Async + OrderedWait");
envAsyncOrdered.execute("orderedWait");
// --------------------------------- Sync IO -------------------------
StreamExecutionEnvironment envSync = StreamExecutionEnvironment.getExecutionEnvironment();
envSync.setParallelism(1);
envSync.setBufferTimeout(0);
envSync.fromCollection(dataset())
.process(new ProcessFunction<Tuple2<String, Integer>, String>() {
private transient MysqlClient client;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = new MysqlClient();
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
try {
client.update(value);
out.collect(value + ": " + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}).print();
System.out.println("Sync IO:");
envSync.execute("Sync IO");
}
static List<Tuple2<String, Integer>> dataset() {
List<Tuple2<String, Integer>> dataset = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
// f0: 元素名称 f1: sleep的时间
dataset.add(new Tuple2<>("e" + i, i % 3 + 1));
}
return dataset;
}
static class ThreadPoolAsyncMysqlRequest extends RichAsyncFunction<Tuple2<String, Integer>, String> {
private transient ExecutorService executor;
private transient MysqlClient client;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor = Executors.newFixedThreadPool(30);
client = new MysqlClient();
}
@Override
public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) {
executor.submit(() -> {
long current = System.currentTimeMillis();
String output = input + ":" + current;
try {
client.update(input);
resultFuture.complete(Collections.singleton(output));
} catch (SQLException e) {
e.printStackTrace();
resultFuture.complete(Collections.singleton(input + ": " + e.getMessage()));
}
});
}
@Override
public void timeout(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception {
System.out.printf("%s timeout\n", input);
resultFuture.complete(Collections.singleton(input + ": time out"));
}
@Override
public void close() throws Exception {
client.close();
super.close();
}
}
static class MysqlClient {
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://10.9.1.18:3306/test?useSSL=false";
private Connection conn;
private PreparedStatement ps;
public MysqlClient() throws Exception {
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, "root", "root123.");
ps = conn.prepareStatement("UPDATE account SET balance = ? WHERE id = ?;");
}
public void update(Tuple2<String, Integer> tuple2) throws SQLException {
// 为了模拟耗时IO,这里使用sleep替换真正的数据库操作
//ps.setLong(1, balance);
//ps.setLong(2, 1);
//ps.execute();
try {
Thread.sleep(tuple2.f1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void close() {
try {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
代码输出如下:
Async + UnorderedWait:
(e0,1):1617109474293
(e3,1):1617109474294
(e1,2):1617109474293
(e4,2):1617109474294
(e2,3):1617109474293
(e6,1):1617109474295
(e9,1):1617109474297
(e7,2):1617109474296
(e5,3):1617109474295
(e8,3):1617109474297
Async + OrderedWait
(e0,1):1617109474891
(e1,2):1617109474891
(e2,3):1617109474891
(e3,1):1617109474891
(e4,2):1617109474892
(e5,3):1617109474892
(e6,1):1617109474893
(e7,2):1617109474893
(e8,3):1617109474893
(e9,1):1617109474893
Sync IO:
(e0,1): 1617109475257
(e1,2): 1617109475260
(e2,3): 1617109475264
(e3,1): 1617109475265
(e4,2): 1617109475268
(e5,3): 1617109475272
(e6,1): 1617109475274
(e7,2): 1617109475277
(e8,3): 1617109475281
(e9,1): 1617109475283
可以看到:
该程序运行多次,输出不太稳定,但Async方式都是远远小于Sync的。因为数据量比较小,而且耗时都比较平均,所以无序的优势不明显,有时甚至还会比有序高。这个例子并不是一个严格的性能测试,但却可以用来体现Async相比于Sync的明显优势。
第二个例子则是使用真实的异步请求客户端执行真正的操作。这里使用了jasync-sql用于异步访问MySQL。完整代码如下:
public class AsyncDemo {
public static void main(String[] args) throws Exception {
final long repeat = 100L;
// --------------------------------- Async IO + Unordered -------------------------
StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment();
envAsyncUnordered.setParallelism(1);
envAsyncUnordered.setBufferTimeout(0);
DataStream<Long> source = envAsyncUnordered.fromSequence(1, repeat);
DataStream<String> result = AsyncDataStream.unorderedWait(source, new AsyncMysqlRequest(),
10, TimeUnit.SECONDS, 10);
result.print();
System.out.println("Async + UnorderedWait:");
envAsyncUnordered.execute("unorderedWait");
}
static class AsyncMysqlRequest extends RichAsyncFunction<Long, String> {
Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = MySQLConnectionBuilder.createConnectionPool("jdbc:mysql://{your-ip}:3306/test?user=root&password={your-password}");
conn.connect().get();
}
@Override
public void asyncInvoke(Long input, ResultFuture<String> resultFuture) throws Exception {
List<Long> balance = new ArrayList<>(1);
balance.add(input);
CompletableFuture<QueryResult> future =
conn.sendPreparedStatement("SELECT * FROM account WHERE balance = ?;", balance);
// 如果执行成功
future.thenAccept(queryResult -> {
resultFuture.complete(Collections.singleton(balance + ":" + System.currentTimeMillis()));
});
// 如果执行异常
future.exceptionally(e -> {
e.printStackTrace();
resultFuture.complete(Collections.singleton(balance + e.getMessage() + ":" + System.currentTimeMillis()));
return null;
});
}
@Override
public void timeout(Long input, ResultFuture<String> resultFuture) throws Exception {
System.out.printf("%d timeout\n", input);
resultFuture.complete(Collections.singleton(input + ": time out"));
}
}
}
AsyncFunction#asyncInvoke(...)
内部执行比较耗时的操作,比如同步等待异步请求的结果(应该放到回调中执行)。因为一个流中每个Partition只有一个AsyncFunction实例,一个实例里面的数据是顺序调用asyncInvoke
的,如果在里面执行耗时操作,那异步效果将大打折扣,如果同步等待异步的结果,那其实就退化成同步IO了。ResultFuture#complete
或ResultFuture#completeExceptionally
将执行结果传递给ResultFuture,否则异步请求会一直堆积在队列里面。当队列满了以后,整个任务流就卡主了。这里结合目前最新的Flink 1.12.1版本从源码角度简单分析一下Async I/O的实现,这里引用了Jark博客中的几张图(见文末引用部分)。
如图,Flink Async I/O实质是通过队列的方式实现了一个异步的生产者-消费者模型。其中队列就是StreamElementQueue
;生产者就是就是我们的主流,通过AsyncDataStream.xxxWait
将数据放入队列;消费者就是图中的Emitter。调用流程就是流中的一个事件到了以后,先放入队列中,同时调用通过AsyncFunction#asyncInvoke(...)
定义的异步逻辑。当异步逻辑执行完成后,就会将队列中该事件标记为已完成,然后Emitter将该事件的结果发送到下游继续执行。实现Async I/O最主要的类是AsyncWaitOperator
,我把代码做了精简,保留了体现上面步骤的部分代码,具体见代码中的注释。
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
/** Capacity of the stream element queue. */
private final int capacity;
/** Output mode for this operator. */
private final AsyncDataStream.OutputMode outputMode;
/** Timeout for the async collectors. */
private final long timeout;
/** Queue, into which to store the currently in-flight stream elements. */
// 队列
private transient StreamElementQueue<OUT> queue;
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue<>(capacity);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue<>(capacity);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
this.timestampedCollector = new TimestampedCollector<>(output);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// add element first to the queue
// 将事件放入队列
final ResultFuture<OUT> entry = addToWorkQueue(element);
final ResultHandler resultHandler = new ResultHandler(element, entry);
// register a timeout for the entry if timeout is configured
// 设置异步回调的超时处理
if (timeout > 0L) {
final long timeoutTimestamp =
timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timeoutTimer =
getProcessingTimeService()
.registerTimer(
timeoutTimestamp,
timestamp ->
userFunction.timeout(
element.getValue(), resultHandler));
resultHandler.setTimeoutTimer(timeoutTimer);
}
// 调用异步逻辑
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
/**
* Add the given stream element to the operator's stream element queue. This operation blocks
* until the element has been added.
*
* <p>Between two insertion attempts, this method yields the execution to the mailbox, such that
* events as well as asynchronous results can be processed.
*
* @param streamElement to add to the operator's queue
* @throws InterruptedException if the current thread has been interrupted while yielding to
* mailbox
* @return a handle that allows to set the result of the async computation for the given
* element.
*/
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
throws InterruptedException {
Optional<ResultFuture<OUT>> queueEntry;
while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
mailboxExecutor.yield();
}
return queueEntry.get();
}
/**
* Outputs one completed element. Watermarks are always completed if it's their turn to be
* processed.
*
* <p>This method will be called from {@link #processWatermark(Watermark)} and from a mail
* processing the result of an async function call.
*/
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
}
}
}
可以看到processElement
方法中先将事件放入队列,然后注册超时定时器,最后再调用异步处理逻辑。其中addToWorkQueue
中使用了一个while循环往队列里面放入事件,也就是如果队列满了,这个插入操作会一直阻塞在这里。也就是前文提到的:如果并发的异步请求数达到了Capacity的值,流就会阻塞产生反压。然后,在outputCompletedElement
方法中实现了将完成的异步请求事件发送到下游的操作。
前文提到了Flink Async I/O提供了有序和无序两种保证,这部分功能是通过不同的队列实现的。上面的代码中也能看到有序的时候使用的是OrderedStreamElementQueue
,无序的时候使用的是UnorderedStreamElementQueue
,队列的大小就是最大并发的异步请求数Capacity。这两个队列都是图中StreamElementQueue
接口的具体实现,先看下这个接口的定义:
/** Interface for stream element queues for the {@link AsyncWaitOperator}. */
@Internal
public interface StreamElementQueue<OUT> {
/**
* Tries to put the given element in the queue. This operation succeeds if the queue has
* capacity left and fails if the queue is full.
*
* <p>This method returns a handle to the inserted element that allows to set the result of the
* computation.
*
* @param streamElement the element to be inserted.
* @return A handle to the element if successful or {@link Optional#empty()} otherwise.
*/
Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);
/**
* Emits one completed element from the head of this queue into the given output.
*
* <p>Will not emit any element if no element has been completed (check {@link
* #hasCompletedElements()} before entering any critical section).
*
* @param output the output into which to emit
*/
void emitCompletedElement(TimestampedCollector<OUT> output);
/**
* Checks if there is at least one completed head element.
*
* @return True if there is a completed head element.
*/
boolean hasCompletedElements();
/**
* Returns the collection of {@link StreamElement} currently contained in this queue for
* checkpointing.
*
* <p>This includes all non-emitted, completed and non-completed elements.
*
* @return List of currently contained {@link StreamElement}.
*/
List<StreamElement> values();
/**
* True if the queue is empty; otherwise false.
*
* @return True if the queue is empty; otherwise false.
*/
boolean isEmpty();
/**
* Return the size of the queue.
*
* @return The number of elements contained in this queue.
*/
int size();
}
接口定义了5个方法:
tryPut
:向队列中插入数据;emitCompletedElement
:向下游发送已经完成的异步请求,即图上中的Emitter;hasCompletedElements
:判断是否有已经完成的异步请求values
:当前队列中的事件isEmpty
:队列是否为空size
:队列大小下面分别看下有序和无序队列是如何实现这个接口的。
有序实现比较简单,先进先出就行了。所以OrderedStreamElementQueue的代码量不多,我就全贴这里了:
@Internal
public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
/** Capacity of this queue. */
private final int capacity;
/** Queue for the inserted StreamElementQueueEntries. */
private final Queue<StreamElementQueueEntry<OUT>> queue;
public OrderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
this.capacity = capacity;
this.queue = new ArrayDeque<>(capacity);
}
@Override
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}
@Override
public List<StreamElement> values() {
List<StreamElement> list = new ArrayList<>(this.queue.size());
for (StreamElementQueueEntry e : queue) {
list.add(e.getInputElement());
}
return list;
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public int size() {
return queue.size();
}
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
if (queue.size() < capacity) {
StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);
queue.add(queueEntry);
LOG.debug(
"Put element into ordered stream element queue. New filling degree "
+ "({}/{}).",
queue.size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into ordered stream element queue because it "
+ "was full ({}/{}).",
queue.size(),
capacity);
return Optional.empty();
}
}
private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
if (streamElement.isRecord()) {
return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
}
if (streamElement.isWatermark()) {
return new WatermarkQueueEntry<>((Watermark) streamElement);
}
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
}
里面就使用了一个ArrayDeque
实现了队列,事件按顺序进入队列,出队列调用的是queue.poll()
,所以顺序自然是保证的。在出队列的时候会先判断是否有已经完成的异步请求,即hasCompletedElements
,看下它的实现:
@Override
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
代码里面调用queue中头部元素(peek)的isDone
方法。队列里面的元素是StreamElementQueueEntry
接口类型,该接口继承自ResultFuture
接口,且对应的实现类是StreamRecordQueueEntry
,我们看下里面相关的实现代码:
class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {
@Nonnull private final StreamRecord<?> inputRecord;
private Collection<OUT> completedElements;
StreamRecordQueueEntry(StreamRecord<?> inputRecord) {
this.inputRecord = Preconditions.checkNotNull(inputRecord);
}
@Override
public boolean isDone() {
return completedElements != null;
}
@Nonnull
@Override
public StreamRecord<?> getInputElement() {
return inputRecord;
}
@Override
public void emitResult(TimestampedCollector<OUT> output) {
output.setTimestamp(inputRecord);
for (OUT r : completedElements) {
output.collect(r);
}
}
@Override
public void complete(Collection<OUT> result) {
this.completedElements = Preconditions.checkNotNull(result);
}
}
可以看到isDone
方法的逻辑是判断completedElements
是否为null,而completedElements
的赋值就是在complete
方法中的。这就是为什么我们必须在AsyncFunction#asyncInvoke(...)
中定义的回调函数中调用ResultFuture#complete
的原因。如果不这样,队列中的异步事件就无法被标记为已完成,当队列满了以后,整个系统的数据流就卡主了。当然异常时调用ResultFuture#completeExceptionally
也是OK的,这个在AsyncWaitOperator.ResultHandler
类中也有调用。
最后附一张来自Jark博客的图:
无序队列队列相对复杂一些。如果是使用Processing Time,那就是全局无序,也比较简单;但当使用Event Time的时候,如前文所述,就不是全局无序了,在相邻的两个watermark产生的区间内(代码实现中称为一个Segment)事件可以无序;但不同的区间还是要有序,即整体还是必须遵从Watermark的限制。所以为了实现这种机制,无序队列引入了Segment,一个Segment表示相邻watermark产生的一个区间。也就是Segment内无序,Segment之间有序。我们看下代码实现。
@Internal
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
/** Capacity of this queue. */
private final int capacity;
/** Queue of queue entries segmented by watermarks. */
private final Deque<Segment<OUT>> segments;
private int numberOfEntries;
public UnorderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
this.capacity = capacity;
// most likely scenario are 4 segments <elements, watermark, elements, watermark>
this.segments = new ArrayDeque<>(4);
this.numberOfEntries = 0;
}
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
if (size() < capacity) {
StreamElementQueueEntry<OUT> queueEntry;
if (streamElement.isRecord()) {
queueEntry = addRecord((StreamRecord<?>) streamElement);
} else if (streamElement.isWatermark()) {
queueEntry = addWatermark((Watermark) streamElement);
} else {
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
}
numberOfEntries++;
LOG.debug(
"Put element into unordered stream element queue. New filling degree "
+ "({}/{}).",
size(),
capacity);
return Optional.of(queueEntry);
} else {
LOG.debug(
"Failed to put element into unordered stream element queue because it "
+ "was full ({}/{}).",
size(),
capacity);
return Optional.empty();
}
}
private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) {
// ensure that there is at least one segment
Segment<OUT> lastSegment;
if (segments.isEmpty()) {
lastSegment = addSegment(capacity);
} else {
lastSegment = segments.getLast();
}
// entry is bound to segment to notify it easily upon completion
StreamElementQueueEntry<OUT> queueEntry =
new SegmentedStreamRecordQueueEntry<>(record, lastSegment);
lastSegment.add(queueEntry);
return queueEntry;
}
private Segment<OUT> addSegment(int capacity) {
Segment newSegment = new Segment(capacity);
segments.addLast(newSegment);
return newSegment;
}
private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
Segment<OUT> watermarkSegment;
if (!segments.isEmpty() && segments.getLast().isEmpty()) {
// reuse already existing segment if possible (completely drained) or the new segment
// added at the end of
// this method for two succeeding watermarks
watermarkSegment = segments.getLast();
} else {
watermarkSegment = addSegment(1);
}
StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark);
watermarkSegment.add(watermarkEntry);
// add a new segment for actual elements
addSegment(capacity);
return watermarkEntry;
}
@Override
public boolean hasCompletedElements() {
return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
}
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
if (segments.isEmpty()) {
return;
}
final Segment currentSegment = segments.getFirst();
numberOfEntries -= currentSegment.emitCompleted(output);
// remove any segment if there are further segments, if not leave it as an optimization even
// if empty
if (segments.size() > 1 && currentSegment.isEmpty()) {
segments.pop();
}
}
@Override
public List<StreamElement> values() {
List<StreamElement> list = new ArrayList<>();
for (Segment s : segments) {
s.addPendingElements(list);
}
return list;
}
@Override
public boolean isEmpty() {
return numberOfEntries == 0;
}
@Override
public int size() {
return numberOfEntries;
}
/** An entry that notifies the respective segment upon completion. */
static class SegmentedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> {
private final Segment<OUT> segment;
SegmentedStreamRecordQueueEntry(StreamRecord<?> inputRecord, Segment<OUT> segment) {
super(inputRecord);
this.segment = segment;
}
@Override
public void complete(Collection<OUT> result) {
super.complete(result);
segment.completed(this);
}
}
/**
* A segment is a collection of queue entries that can be completed in arbitrary order.
*
* <p>All elements from one segment must be emitted before any element of the next segment is
* emitted.
*/
static class Segment<OUT> {
/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}
/** Signals that an entry finished computation. */
void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
// adding only to completed queue if not completed before
// there may be a real result coming after a timeout result, which is updated in the
// queue entry but
// the entry is not re-added to the complete queue
if (incompleteElements.remove(elementQueueEntry)) {
completedElements.add(elementQueueEntry);
}
}
/**
* True if there are no incomplete elements and all complete elements have been consumed.
*/
boolean isEmpty() {
return incompleteElements.isEmpty() && completedElements.isEmpty();
}
/**
* True if there is at least one completed elements, such that {@link
* #emitCompleted(TimestampedCollector)} will actually output an element.
*/
boolean hasCompleted() {
return !completedElements.isEmpty();
}
/**
* Adds the segmentd input elements for checkpointing including completed but not yet
* emitted elements.
*/
void addPendingElements(List<StreamElement> results) {
for (StreamElementQueueEntry<OUT> element : completedElements) {
results.add(element.getInputElement());
}
for (StreamElementQueueEntry<OUT> element : incompleteElements) {
results.add(element.getInputElement());
}
}
/**
* Pops one completed elements into the given output. Because an input element may produce
* an arbitrary number of output elements, there is no correlation between the size of the
* collection and the popped elements.
*
* @return the number of popped input elements.
*/
int emitCompleted(TimestampedCollector<OUT> output) {
final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll();
if (completedEntry == null) {
return 0;
}
completedEntry.emitResult(output);
return 1;
}
/**
* Adds the given entry to this segment. If the element is completed (watermark), it is
* directly moved into the completed queue.
*/
void add(StreamElementQueueEntry<OUT> queueEntry) {
if (queueEntry.isDone()) {
completedElements.add(queueEntry);
} else {
incompleteElements.add(queueEntry);
}
}
}
}
队列部分的代码如下:
private final Deque<Segment<OUT>> segments;
// most likely scenario are 4 segments <elements, watermark, elements, watermark>
this.segments = new ArrayDeque<>(4);
/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}
可以看到整体还是一个ArrayDeque
,但队列内部是一个Segment;Segment内部又分成了2个队列:未完成请求队列incompleteElements
和已完成请求的队列completedElements
。Segment内部是无序的,所以incompleteElements使用的数据类型是Set。当某个请求完成后,就转移到completedElements中。此时,如果来的是普通事件,则调用addRecord
将事件放入对应Segment的incompleteElements队列中;如果来的是watermark,则调用addSegment
创建一个新的Segment。如果使用Processing Time,那就永远没有watermark,也就是全局就是一个大的Segment,也就是全局无序。
最后附一张Jark博客的图:
Flink Asycn I/O是阿里结合自身实践场景贡献给社区的一个特性,所以自然是有很多实际需求的。流中关联维表的操作在具体业务中也的确很常见,异步IO就是应对这些场景的利剑。在具体使用时要特别注意本文“使用注意点”一节的几个点。
References:
]]>单流算子大都比较简单,粗略介绍。
reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:
public class ReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// produce 1,2,3,4,5
DataStream<Long> source = env.fromSequence(1, 5);
source.keyBy(value -> 1) // 所有数据在一个Partition
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
// value1 保存上次迭代的结果值
System.out.printf("value1: %d, value2: %d\n", value1, value2);
return value1 + value2;
}
}).print();
env.execute();
}
}
输出如下:
value1: 1, value2: 2
3
value1: 3, value2: 3
6
value1: 6, value2: 4
10
value1: 10, value2: 5
15
Aggregation算子:在KeyedStream上面使用,包括sum
、min
、minBy
、max
,maxBy
。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧:
public class AggregationDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment();
envMin.setParallelism(1);
envMinBy.setParallelism(1);
List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
data.add(new Tuple3<>(0, 2, 4));
data.add(new Tuple3<>(0, 4, 5));
data.add(new Tuple3<>(0, 3, 3));
data.add(new Tuple3<>(0, 1, 2));
data.add(new Tuple3<>(1, 2, 4));
data.add(new Tuple3<>(1, 5, 1));
data.add(new Tuple3<>(1, 1, 0));
data.add(new Tuple3<>(1, 2, 2));
System.out.println("Min:");
DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data);
sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print();
envMin.execute();
System.out.println("\nMinBy:");
DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data);
sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print();
envMinBy.execute();
}
}
输出结果:
Min:
(0,2,4)
(0,2,4)
(0,2,4)
(0,1,4)
(1,2,4)
(1,2,4)
(1,1,4)
(1,1,4)
MinBy:
(0,2,4)
(0,2,4)
(0,2,4)
(0,1,2)
(1,2,4)
(1,2,4)
(1,1,0)
(1,1,0)
可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。
下面看稍微复杂一些的多流算子。
多流算子的差异点主要体现在以下3个方面:
union用于将多个、同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Long> source = env.fromSequence(0, 5);
source.print();
DataStream<Long> unionStream = source.union(source);
unionStream.print();
env.execute();
}
}
输出:
# source
0
1
2
3
4
5
# unionStream
0
0
1
1
2
2
3
3
4
4
5
5
join只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:
Window Join:通过窗口获取一个数据集
Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):
图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());
orangeStream.join(greenStream)
.where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>)
(orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1)
.print();
env.execute();
}
static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1000)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
ctx.emitWatermark(new Watermark(1000));
// window: [1000, 2000)
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
ctx.emitWatermark(new Watermark(2000));
// window: [2000, 3000)
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
ctx.emitWatermark(new Watermark(3000));
// window: [3000, 4000)
ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500);
}
@Override
public void cancel() {
}
}
static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1000)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
// window: [1000, 2000)
ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500);
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
// window: [2000, 3000)
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500);
// window: [3000, 4000)
ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500);
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500);
;
}
@Override
public void cancel() {
}
}
}
代码运行如下:
0,0
0,1
1,0
1,1
2,3
3,3
4,4
5,4
Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound]
,然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
。IntervalJoin目前只支持Event Time。 其语法如下:
stream.keyBy(<KeySelector>)
.intervalJoin(otherStream.keyBy(<KeySelector>))
.between(<lowerBoundTime>, <upperBoundTime>)
.process (<ProcessJoinFunction>);
这里再借用官方的一个图:
上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1]
,下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());
orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception {
out.collect(orangeTuple2.f1 + "," + greenTuple2.f1);
}
}).print();
env.execute();
}
static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
// window: [0, 1)
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
// window: [1, 2)
ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1);
// window: [6, 7)
ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6);
// window: [7, 8)
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
}
@Override
public void cancel() {
}
}
static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.emitWatermark(new Watermark(0));
ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
ctx.emitWatermark(new Watermark(2));
ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2);
ctx.emitWatermark(new Watermark(3));
ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3);
ctx.emitWatermark(new Watermark(4));
ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4);
ctx.emitWatermark(new Watermark(5));
ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5);
ctx.emitWatermark(new Watermark(7));
ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
}
@Override
public void cancel() {
}
}
}
输出如下:
0,0
0,1
2,0
2,1
5,6
7,6
7,7
总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。
前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:
dataStream.coGroup(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>
.apply (new CoGroupFunction () {...});
相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:
public class CoGroupDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource());
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource());
orangeStream.coGroup(greenStream)
.where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
.equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("window:{ orange stream elements: [ ");
orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
sb.append("], green stream elements: [ ");
green.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
sb.append("]");
out.collect(sb.toString());
}
})
.print();
env.execute();
}
}
输出如下:
window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ]
window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ]
window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ]
window:{ orange stream elements: [ ], green stream elements: [ 4 ]
window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]
connect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。
单独的connect:
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 主数据流
DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long i = 0;
while (true) {
ctx.collect(i++);
Thread.sleep(500);
}
}
@Override
public void cancel() {
}
});
// 规则数据流
DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect("one");
Thread.sleep(5000);
ctx.collect("two");
Thread.sleep(5000);
ctx.collect("three");
Thread.sleep(Long.MAX_VALUE);
}
@Override
public void cancel() {
}
});
dataStream.connect(ruleStream)
.flatMap(new CoFlatMapFunction<Long, String, Object>() {
String rule;
@Override
public void flatMap1(Long value, Collector<Object> out) throws Exception {
if ("one".equalsIgnoreCase(rule)) {
out.collect(value);
} else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
out.collect(value);
} else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
out.collect(value);
}
}
@Override
public void flatMap2(String value, Collector<Object> out) throws Exception {
System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
rule = value;
}
}).print();
env.execute();
}
}
输出如下:
update rule, old rule = null, new rule = one
1
2
3
4
5
6
7
8
9
update rule, old rule = one, new rule = two
10
12
14
16
18
update rule, old rule = two, new rule = three
21
24
27
30
33
36
...
配合broadcast实现:
public class BroadcastDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 主数据流
DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long i = 0;
while (true) {
ctx.collect(i++);
Thread.sleep(500);
}
}
@Override
public void cancel() {
}
});
// 规则数据流
DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect("one");
Thread.sleep(5000);
ctx.collect("two");
Thread.sleep(5000);
ctx.collect("three");
Thread.sleep(Long.MAX_VALUE);
}
@Override
public void cancel() {
}
});
MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<String>() {
}));
BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
dataStream.connect(ruleBroadcastStream)
.process(new BroadcastProcessFunction<Long, String, Long>() {
String rule;
@Override
public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception {
if ("one".equalsIgnoreCase(rule)) {
out.collect(value);
} else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
out.collect(value);
} else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception {
System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
rule = value;
}
}).print();
env.execute();
}
}
输出与单独connect的输出一模一样,这里就不再附运行结果了。
Iterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne
),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。
public class IterateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4);
tuple2s.add(new Tuple2<>("Jim", 3));
tuple2s.add(new Tuple2<>("John", 2));
tuple2s.add(new Tuple2<>("Lily", 1));
tuple2s.add(new Tuple2<>("Lucy", 4));
// source
DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s);
source.print();
// 得到一个IterativeStream
IterativeStream<Tuple2<String, Integer>> iteration = source.iterate();
// 执行常规计算
DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1);
return new Tuple2<>(value.f0, value.f1 - 1);
}
});
// 找出需要迭代的元素形成一个流
DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
boolean greaterThanZero = value.f1 > 0;
if (greaterThanZero) {
System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1);
}
return greaterThanZero;
}
});
// 将需要迭代的流传递给closeWith方法
iteration.closeWith(stillGreaterThanZero);
minusOne.filter(tuple2 -> tuple2.f1 <= 0).print();
env.execute();
}
}
输出如下:
(Jim,3)
(John,2)
(Lily,1)
(Lucy,4)
[minusOne] Jim: 3
Jim is still greater than zero(now: 2), back to minusOne
[minusOne] John: 2
John is still greater than zero(now: 1), back to minusOne
[minusOne] Lily: 1
(Lily,0)
[minusOne] Lucy: 4
Lucy is still greater than zero(now: 3), back to minusOne
[minusOne] Jim: 2
Jim is still greater than zero(now: 1), back to minusOne
[minusOne] John: 1
(John,0)
[minusOne] Lucy: 3
Lucy is still greater than zero(now: 2), back to minusOne
[minusOne] Jim: 1
(Jim,0)
[minusOne] Lucy: 2
Lucy is still greater than zero(now: 1), back to minusOne
[minusOne] Lucy: 1
(Lucy,0)
一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。
下篇文章介绍另外一个非常重要的算子:Flink Async I/O。
]]>注意:本文的“任务”一词是个通用概念,不代表Task。
Job最容易理解,一个Job代表一个可以独立提交的大任务,可以认为一个execute
或者executeAsync
就产生一个Job,我们向JobManager提交任务的时候就是以Job为单位的,只不过一份代码里面可以包含多个Job。比如SocketWordCount就是一个Job。
至于Task和Subtask看下面我画的图:
图说明如下:
应该非常清楚了,下面总结一下结论:
好了,理解了这些概念,评估一个应用大概会产生多少个线程的时候(不考虑一些框架自身的线程)根据JobGraph就可以大概计算出来了:
$$ \sum_{i=1}^n operator_i * parallelism_i $$
即累加所有Operator和它的并行度的乘积。
还没完,再讲一下Slot Sharing。
架构部分讲了TaskManager是真正干活的,启动的时候会将自己的资源以Slot的方式注册到ResourceManager,然后JobMaster从ResourceManager处申请到Slot资源之后将自己优化过后的任务调度到这些Slot上面去运行,在整个过程中Subtask是调度的基本单元,Slot则是资源分配的基本单元。需要注意的是目前Slot只隔离内存,不隔离CPU。
为了高效的使用资源,Flink默认允许同一个Job中不同Task的Subtask运行在一个Slot中,这就是SlotSharing。注意一下描述中的几个关键条件:
下面看下官方的两个图:
6个Slot,5个Subtask,并行度为2:
此时Subtask少于Slot个数,所以每个Subtask独占一个Slot,没有SlotSharing。把并行度改为6:
此时,Subtask的个数多于Slot了,所以出现了SlotSharing,一个Slot中分配了多个Subtask,特别是最左边的Slot中跑了一个完整的Pipeline。SlotSharing除了提高了资源利用率,还简化了并行度和Slot之间的关系——一个Job运行需要的Slot个数就是其中并行度最高的那个Task的并行度:
$$ Job需要的Slot个数=\max(parallelism_{task_1}, parallelism_{task_2}, ..., parallelism_{task_n}) $$
掌握了这些八股知识,就能更好的评估资源了。
Reference:
]]>我看了一下这个JIRA,其实已经有人提了PR(#14283)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的(点此下载,密码: hi52
。怎么现在分享还必须设置密码了...)。
这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:
// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
key("kubernetes.jobmanager.volumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
key("kubernetes.taskmanager.vlumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
也可以从单元测试文件看使用方法:
// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
+ VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}
emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:
-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>
。最后一个false或者true表示是否以只读方式挂载。-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>
。下面利用这个PR实现基于NFS的Flink Kubernetes HA。
flink-dist_2.11-1.12.0.jar
替换官方包里面lib
目录下的flink-dist_2.11-1.12.0.jar
(懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
flink-ha-pvc Bound pvc-46537a5b-2adc-442e-ae59-52af4c681f2c 500Mi RWX nfs-storage 16h
以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-application-cluster \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=file:///opt/flink/flink-ha \
-Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \
-Dkubernetes.container.image=top-speed-windowing:1.12.0 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
检查一下:
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-application-cluster-9589dbf58-hm7xj 1/1 Running 0 76s
flink-application-cluster-taskmanager-1-1 1/1 Running 0 33s
$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name: flink-application-cluster-9589dbf58-hm7xj
Namespace: default
Priority: 0
Node: 10.9.1.18/10.9.1.18
Start Time: Thu, 24 Dec 2020 10:26:07 +0800
Labels: app=flink-application-cluster
component=jobmanager
pod-template-hash=9589dbf58
type=flink-native-kubernetes
Annotations: <none>
Status: Running
IP: 172.20.0.165
IPs:
IP: 172.20.0.165
Controlled By: ReplicaSet/flink-application-cluster-9589dbf58
Containers:
flink-job-manager:
Container ID: docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
Image: top-speed-windowing:1.12.0
Image ID: docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
Ports: 8081/TCP, 6123/TCP, 6124/TCP
Host Ports: 0/TCP, 0/TCP, 0/TCP
Command:
/docker-entrypoint.sh
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
State: Running
Started: Thu, 24 Dec 2020 10:26:11 +0800
Ready: True
Restart Count: 0
Limits:
cpu: 1
memory: 1600Mi
Requests:
cpu: 1
memory: 1600Mi
Environment:
_POD_IP_ADDRESS: (v1:status.podIP)
Mounts:
/opt/flink/conf from flink-config-volume (rw)
/opt/flink/flink-ha from jobmanager-ha (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
flink-config-volume:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: flink-config-flink-application-cluster
Optional: false
jobmanager-ha:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: flink-ha-pvc
ReadOnly: false
default-token-pzw5h:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-pzw5h
Optional: false
QoS Class: Guaranteed
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> default-scheduler Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
Warning FailedMount 80s (x2 over 81s) kubelet, 10.9.1.18 MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
Normal Pulled 78s kubelet, 10.9.1.18 Container image "top-speed-windowing:1.12.0" already present on machine
Normal Created 78s kubelet, 10.9.1.18 Created container flink-job-manager
Normal Started 77s kubelet, 10.9.1.18 Started container flink-job-manager
可以看到已经正确挂载了。
能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。
另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。
不过,如果你只想完全用官方的东西,那完全可以像之前版本一样,使用非Native的方式在Kubernetes上面部署Flink,不过我还是喜欢Native的东西,更加简单。
]]>当然,这些特性目前在有些细小方面还是存在一些不足(不过瑕不掩瑜),下面的测试中会有所说明。
Flink 1.12已经有一段时间了,但官方的镜像到现在也还没有推上去,如果你用的是最新的1.12.0版本,看官方文档的时候一定要注意,文档里面都没有指定镜像名或者tag,这样拉取到的其实还是1.11.x版本的镜像,然后打出来的镜像在1.12的flink上面运行大概率是会报错的。
所以本次测试是我自己打的镜像。
git clone https://github.com/apache/flink-docker
# 此处可以选择你自己的Scala和JDK版本
cd 1.12/scala_2.11-java8-debian
docker build --tag flink:1.12.0-scala_2.11 .
Tips:Flink默认的镜像命名规则为:flink:<FLINK_VERSION>-scala_<SCALA_VERSION>"
,当然你也可以自定义,然后在启动的时候通过kubernetes.container.image
参数指定镜像名称。
后面测试高可用的时候,我使用了HDFS,默认的镜像里面是不包含HDFS的依赖的,所以需要自己加进去(自行从maven下载)。如果你用的是oss、s3、swift之类的,就不需要了,默认已经包含了,只需要启动的时候配置一下即可。下面是Dockerfile:
FROM flink:1.12.0-scala_2.11
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib/
执行docker build --tag flink-haddop:1.12.0-scala_2.11 .
。
下面还会演示Flink Application Cluster,这种模式下需要把应用打到flink镜像里面,方法也非常简单,把应用的jar拷贝到$FLINK_HOME/usrlib
即可,如果你需要定制配置文件的话,也可以加到这里。下面以flink自带的TopSpeedWindowing为例,该应用的jar在flink安装包的examples/streaming
目录下。下面是Dockerfile:
FROM flink-haddop:1.12.0-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
执行docker build --tag top-speed-windowing:1.12.0 .
。
这样本文用到的所有镜像就准备完成了。
先下载并解压Flink 1.12.0版本(过程略),下面的命令都是在flink解压后的目录执行的。
前文介绍过了,Flink Session Cluster就是先部署一个集群,然后往集群上面提交任务。所以我们先创建一个集群:
./bin/kubernetes-session.sh \
-Dkubernetes.container.image=flink:1.12.0-scala_2.11 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dkubernetes.cluster-id=flink-session-cluster
# 下面是输出:
2020-12-21 22:55:35,527 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-12-21 22:55:35,538 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-21 22:55:35,538 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2020-12-21 22:55:35,540 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-12-21 22:55:35,773 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-12-21 22:55:38,128 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,148 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,163 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2020-12-21 22:55:38,163 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2020-12-21 22:55:38,259 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:44,134 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster flink-session-cluster successfully, JobManager Web Interface: http://10.9.1.18:38566
这样一个集群就部署好了,Native Kubernetes的部署方式使用了Kubernetes的资源管理和分配能力,所以此时集群只有Jobmanager。TaskManager会在后面有任务时动态创建出来。输出日志的最后一行打印了Web UI地址,可以检查集群状态。
$ kubectl get pod,svc,cm,deploy
NAME READY STATUS RESTARTS AGE
pod/flink-session-cluster-665766d9d5-8jxmb 1/1 Running 0 13s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-session-cluster ClusterIP None <none> 6123/TCP,6124/TCP 12s
service/flink-session-cluster-rest NodePort 10.68.245.4 <none> 8081:38566/TCP 12s
service/kubernetes ClusterIP 10.68.0.1 <none> 443/TCP 38d
NAME DATA AGE
configmap/flink-config-flink-session-cluster 3 12s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-session-cluster 1/1 1 1 13s
集群部署好了,提交一个任务:
./bin/flink run -p 4 \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-session-cluster \
./examples/streaming/TopSpeedWindowing.jar
# 查看一下动态创建的TaskManager
kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-session-cluster-665766d9d5-8jxmb 1/1 Running 0 2m17s
flink-session-cluster-taskmanager-1-1 1/1 Running 0 33s
flink-session-cluster-taskmanager-1-2 1/1 Running 0 33s
然后在web上面删除任务,可以看到集群依然是存在的,因为Session Cluster的生命周期是独立于Job的。但TaskManager在空闲一段时间(resourcemanager.taskmanager-timeout
,默认30秒)后会被回收。
最后删除测试集群:
kubectl delete deploy/flink-session-cluster
前文介绍过,Flink Application Cluster就是应用创建自己专属的集群,一个应用可以包含多个Job,集群生命周期和Job同步。
# 创建集群&&启动应用
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-application-cluster \
-Dkubernetes.container.image=top-speed-windowing:1.12.0 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
# 查看
kubectl get pod,svc,cm,deploy
NAME READY STATUS RESTARTS AGE
pod/flink-application-cluster-7fc5ccd899-9p7ns 1/1 Running 0 115s
pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 54s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 114s
service/flink-application-cluster-rest NodePort 10.68.147.132 <none> 8081:39463/TCP 113s
service/kubernetes ClusterIP 10.68.0.1 <none> 443/TCP 38d
NAME DATA AGE
configmap/flink-config-flink-application-cluster 3 113s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-application-cluster 1/1 1 1 115s
和之前的Session Cluster相比,集群创建和任务提交集成在了一起,一条命令搞定了所有的事情,非常的方便。此时,如果你在Web上取消任务,你会发现整个集群都没了,符合Job Cluster生命周期与Job同步的说法。
目前不支持。Flink Job Cluster其实比较鸡肋,介于Session Cluster和Application Cluster之间,一般根据需要选则后面这两个即可。
Flink的高可用主要解决JobManager的单点故障问题,之前只有一种基于Zookeeper的方案,1.12.0版本中增加了一个基于Kubernetes ConfigMap的方案(仅用于使用Kubernetes部署Flink的场景),该特性对应有一个FLIP-144: Native Kubernetes HA for Flink,对设计细节有兴趣的可以看下。这个原生的高可用方案使用也非常简单,在上的基础上再增加两个配置项即可:
# HA服务,zk的时候是zookeeper,Kubernetes的时候填下面这个类
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
# 虽然不再依赖zk了,但仍然需要高可用存储
high-availability.storageDir: hdfs:///flink/recovery
这里我使用的高可用存储是HDFS,下面分别创建两个集群:
# Flink Session Cluster
./bin/kubernetes-session.sh \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
-Dkubernetes.cluster-id=flink-session-cluster
# Flink Application Cluster
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
-Dkubernetes.cluster-id=flink-application-cluster \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
-Dkubernetes.container.image=top-speed-windowing:1.12.0 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
Native的HA方案真的是非常naive,使用非常简单,没什么需要讲的。
最后解释一些关于HA的问题。
1, 为什么上述命令启动的JobManager Pod只有1个副本?答:除了用上面Flink自己的kubernetes-session.sh
或者flink
命令这种便捷方式容器化外,也有很多人自己写部署文件或者使用官方提供的部署文件(特别是Native Kubernetes还不成熟的时候),有些人部署HA的时候将JobManager的副本数设置为2,甚至更多。其实在Kubernetes上面,这是没有必要的。如前所述,Flink提供的HA是主JobManager挂掉后,从快速接替主的职责,从共享存储上面恢复已经提交运行的任务。对于Kubernetes,如果JobManager的Pod挂掉了,马上会有一个新的JobManager Pod被创建出来。也就是这个standby的JobManager不是事先就创建好的,而是在需要的时候动态产生的。这样也不用平时空跑一个standby的JobManager浪费资源(注意:standby的JobManager是纯备,不对外提供任务服务,也没有什么负载均衡的功能)。
2, Flink的HA模式下,如果主JobManager挂掉了,任务会重启,这还算HA吗?会丢数据吗?答:这个问题也同时适用于基于Zookeeper的HA。Flink的HA模式依赖两部分:一部分是ZK或者Kubernetes的ConfigMap,这部分主要职责是负责选举主JobManager、服务发现、少量元数据存储(比如任务的运行状态、任务相关的二进制文件在共享存储上的存储路径等);另外一部分是共享存储(比如上面的HDFS),该部分的主要职责是存储任务相关的二进制文件,比如Job Graph及其依赖等。有了这两部分,当主JobManager挂掉后,就会有新的JobManager产生,它可以依据这些信息重新恢复之前运行的任务。注意,这里是恢复。也就是主节点故障后,任务会故障,但HA模式下,Flink会保证任务快速被恢复。那这种机制算HA吗?当然要看下HA的定义:
高可用(High Availability,HA)是分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计减少系统不能提供服务的时间。假设系统一直能够提供服务,我们说系统的可用性是100%。如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。在线系统和执行关键任务的系统通常要求其可用性要达到5个9标准(99.999%,年故障时间为5分15秒)。
按此定义来说,上述机制当然算HA,只是它提供的不是100%的可用性而已。另外,丢不丢数据这个其实跟HA关系不大,解决丢数据的问题是分布式中容错(Fault Tolerance)的职责,Flink提供了Checkpoint和Savepoint两种机制。也就是如果你开启了Checkpoint,那有没有HA都可以保证不丢数据;反之,没开启的话,有没有HA都可能会丢数据。
可以看到,这两个新特性让Flink在Kubernetes上的使用变得简单了很多。当然,试用了一下,我个人觉得后面还是有一些工作需要完善,比如现在通过kubernetes-session.sh
或者flink
创建集群时,虽然已经支持很多配置项(见这里)来满足一些自定义,但还是不够。比如我刚开始想用nfs来测试高可用,但发现通过上面的方式创建集群的话,没有方式可以挂载PVC,像HDFS、OSS、S3这些不需要挂载就可以直接使用,但NFS不行,所以后面就又部署了一个HDFS进行测试。不过目前社区已经在改进这些不足了,见FLINK-15649: Support mounting volumes和FLINK-15656: Support user-specified pod templates. 这部分我单独写了一篇文章Flink Native Kubernetes支持Volume Mount,有兴趣的可以查看。
参考:
]]>如上图,Flink中有4种图:StreamGraph、JobGraph、ExecutionGraph、PhysicalGraph,分别处于不同的阶段,承担不同的职责。
StreamGraph其实就是把我们的代码逻辑以拓扑图的形式组织了一下,其实现类的描述如下:
// StreamGraph.java
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
*/
@Internal
public class StreamGraph implements Pipeline {
...
}
StreamGraph可以查看,方法是在我们的execute()
代码之前加入System.out.println(env.getExecutionPlan());
,这样会输出一个JSON,将这个JSON粘贴到这里就可以查看StreamGraph。以官方的SocketWordCount为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
这个代码是一个非常经典的流处理过程:Source(socketTextStream
) --> Map(flatMap
) --> Window(keyByWindow
) --> Sink(print
),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:
默认并行度设置为1(env.setParallelism(1);
)时:
默认并行度设置为4时:
注:
程序真正运行时也会先转化为StreamGraph,然后进一步转化为JobGraph。
JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为Logical Graph或者Dataflow Graph(题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了@Internal
,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。
官网描述如下:
A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.
代码注释说明如下:
// JobGraph.java
/**
* The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
* define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {
...
}
StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是Operator Chain:
Source
和FlatMap
合并在一起。JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):
注意:如果应用有必填参数一定要填,否则会报错。
默认并行度设置为1时的JobGraph:
可以看到默认并行度设置为1的时候,优化器把source和flatmap这两个operator串接(chain)成一个大的子任务了,把后面的window和sink两个operator串接(chain)成一个大的子任务了,这样到时候source和flatmap就在一个线程里面执行,window和sink在一个线程里面运行。
默认并行度设置为4时的JobGraph:
如果我们把默认并行度设置为4,那图就变成了上面这样,可以看到相比于并行度为1的图,没有任何合并。主要原因是:
socketTextStream
,这是一个不能并行的operator,所以并行度永远是1(强制设置为非1运行会报错);很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。
Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是JobGraph和该Job依赖的jar包。
JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为并行化、可调度的执行图。这个动作是JobManager(其中的JobMaster组件)做的。
这一层已经更加具体化了,代码说明比较长,但对于使用者基本无需关注,为了完整性,这里也附一下,有兴趣的自行查看:
/**
* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
* intermediate stream, and the communication between them.
*
* <p>The execution graph consists of the following constructs:
* <ul>
* <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
* "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
* The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
* from the JobGraph's corresponding JobVertex.</li>
* <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
* as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
* the ExecutionJobVertex and the index of the parallel subtask</li>
* <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
* for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
* because it is no longer available when requested by later operations. An Execution is always
* identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
* about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
* address the message receiver.</li>
* </ul>
*
* <h2>Global and local failover</h2>
*
* <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
*
* <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
* data flow graph from the last completed checkpoint. Global failover is considered the
* "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
* found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
*
* <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
* The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
* attempts to restart as little as possible, but as much as necessary.
*
* <p>Between local- and global failover, the global failover always takes precedence, because it
* is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
* guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
* with every global failover (and other global actions, like job cancellation, or terminal
* failure). Local failover is always scoped by the modification version that the execution graph
* had when the failover was triggered. If a new global modification version is reached during
* local failover (meaning there is a concurrent global failover), the failover strategy has to
* yield before the global failover.
*/
public class ExecutionGraph implements AccessExecutionGraph {
...
}
物理图其实是任务调度到TaskManager上面真正执行的一种“运行图”,它没有具体对应的类。官方说明如下:
A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.
这四个图越往下,越具体,也越难理解,但实际中对于使用而言,一般只需要知道有这几个环节转换即可,细节无需太关注。下面附一张Jark大神博客的图,更多细节可见参考部分给的他的文章:
其实图这个概念并非Flink原创,Storm、Spark里面也有类似的概念,Flink也是站在巨人的肩膀上而已。从StreamGraph迭代到最终的PhysicalGraph,由抽象到具体,抽象的给人看,具体的给框架看,有点类似于代码编译时的预处理、编译、汇编、链接的过程。
参考:
]]>三种运行模式主要区别在3个方面:
main()
方法在Client侧执行还是在集群侧执行下面分别介绍一下。
该模式就是先有一个已经在运行的Flink集群(至少有JobManager),然后我们把任务提交上去,所有的任务都运行在这一个集群上面,典型的场景就是Standalone模式静态部署的普通集群。此时:
main()
方法在Client侧执行。该模式以前也称为"Flink Cluster in session mode".
该模式就是每个Job动态创建一个属于自己专有的集群,此时:
main()
方法在Client侧执行。该模式以前也称为"Flink Cluster in per-job mode".
一个Application指包含一个或多个任务(Job)的程序,也就是包含多个execute
或executeAsync
。该模式下,一个Application动态创建一个属于自己专有的集群,Application内的所有任务共享该集群,很显然这是一种介于Session Cluster和Job Cluster之间的模式:不同Application之间是完全隔离的,类似Job Cluster;但一个Application内的任务是不隔离的,类似于Session Cluster。此时:
main()
方法在集群侧执行。该模式以前也称为"Flink Cluster in application mode".
其实也没啥对比的,各自的优缺点非常简单明显。要对比的话,主要的对比点就是资源隔离、main()
方法的执行位置、集群是否是动态创建三个方面。
main()
方法是在集群侧的JobManager中执行的,其它两种模式是在Client端执行的。这个对于一些比较大型或复杂的应用来说区别还是挺大的,毕竟集群侧的资源一般是比较充足的,而且可以负载均衡。Client测去执行main()
方法可能会是一个瓶颈,特别是有多个人共享这个Client的时候。Flink也支持一些第三方的集群管理框架,当使用这些框架时,集群的资源管理都会交给这些框架。目前支持:
表格看着更清楚:
- | Session Cluster | Job Cluster | Application Cluster |
---|---|---|---|
Standalone(包括on Docker,on K8s) | 支持 | 不支持 | 支持 |
Native Kubernetes | 支持 | 不支持 | 支持 |
YARN | 支持 | 支持 | 支持 |
Mesos | 支持 | 支持 | 不支持 |
参考:
]]>如上图,Flink这个分布式流批统一计算框架也是典型的主从架构,JobManager是主,TaskManager是从。JobManager其实是一个统称,其内部根据功能拆分成了3个大模块:
注意:
所以,一个flink任务提交流程就是图中所示的6步:
flink run
和Web UI)提交任务到Dispatcher;Flink集群管理目前支持Standalone、YARN、Mesos、Kubernetes几种方式,不同的部署模式下,上面JobManager里面的组件的存在形式可能会有一些差异。
这部分的实现是在flink-runtime
这个模块里面的,而且代码里面的命名和上面讲的大多是能直接对应上的,这部分是Flink的核心,设计也比较优雅,如果想深入源码了解Flink的内部运行机制,这个runtime模块自然是首选。
最后说一下这个Client,它虽然不是Flink运行期的一部分,但却是提交任务的窗口,凡是可以向集群提交任务的“工具”都可以称之为Client。截止目前,主要有如下Client:
bin/start-scala-shell.sh
)bin/sql-client.sh
)bin/flink
)bin/pyflink-shell.sh
)以上内容基于目前最新的Flink 1.12.0版本。更多细节可参考:
]]>最近因解决项目中的一些问题,再次好好看了下部分官方文档,觉得还是要写一下,主要有这么几个原因:
基于上面两个原因,我决定还是写一下,就当是加强自己的理解和做笔记了。我会尽量保证易懂(可能会变成啰嗦...)和全面(我更喜欢叫“理解的闭环”),所以标题也想了很久,觉得就叫“快速了解”吧(看着就很肤浅是不是?)。但为了保证完整性,我会加一些我推荐的文章及参考链接,供喜欢深入研究的人学习。
好了,说这么多,就是提前给自己留个台阶,其实就是想说:写的不好,请勿喷;实在忍不住的话,请轻喷!当然,错误、意见、建议还是非常欢迎不吝赐教的!当然,系列文章也经常会中道崩殂,不要有任何意外!
]]>什么是Window?为什么需要Window?流处理里面一般都是事件驱动的(Spark是微批),即每个事件来就会触发算子(Operator)进行计算,典型的比如map、flatmap、filter等,这些都是无状态的计算。有些时候需要在流处理里面进行有状态的计算,比如电商场景分析1分钟的访问人数、购买人数各是多少等,这些计算要缓存数据(即是有状态的计算),需要通过Window来做,这就是Window要解决的场景:支持流处理中有状态的计算。另外,Window本身也是一个算子,只不过是一个有状态的算子。
鉴于官网的文档非常详尽,我不重复造轮子了,我从另外一个角度来介绍一个窗口:窗口的生命周期,即从窗口的创建到最终销毁是如何流转的。这部分一些细节官方文档说的不是特别明确,我通过这篇文章做一下补充。如果你对窗口还没有任何概念,建议先阅读官方文档,对窗口有一些基础了解之后,再来阅读本文。
为了方便讲解,我把Flink的Watermark细节介绍一文中构造的例子稍微进行了一点改造,在原始事件中加入了事件类型,共有三种:pv(浏览)、cart(加购物车)、buy(购买),这样产生的原始事件就是下面这样了(注意,事件依旧是乱序的):
event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
然后我们要做的事情就是分析每5秒钟各种行为的个数。这里的数据只有12:00:00.000-12:00:05.000
和12:00:05.000-12:00:10.000
两个5秒钟窗口的数据,我们也就用这两个窗口来分析。
这里我先截了一张官方文档的图:
可以看到Window大的分为两类:Keyed Windows和Non-Keyed Windows。你可以把Window底层想象成一个容器,Keyed Windows就是一个Map,Non-Keyed Windows就是一个List。实际中用的最多的就是Keyed Windows了,主要有两个原因:
不过,Non-Keyed Windows相比于Keyed Windows,仅是少了一个keyBy的操作,后面的流程逻辑是完全一样的,所以对于学习Window的机制,影响不是很大。而且上一篇文章中的示例其实就是Non-Keyed Windows,所以这篇文章主要以Keyed Windows为例进行介绍。
可以看到窗口基本上包含以下几个部分:
WindowAssigner
类来实现自定义的窗口。内置的几个windows都很简单,这里就不展开说了。需要注意的是global windows,它不是一个基于时间的window,后面我们会再次提到。下面我画了一个流转图:
以12:00:00.000-12:00:05.000
这个时间段的窗口为例,源源不断的事件流来了以后,先经过keyBy
生成若干个window(图中有key=pv和key=buy两个window),然后再流经trigger
,判断事件是先缓存还是触发窗口计算。如果缓存,则直接先放在窗口里面。如果是触发计算,且定义了before evictor,则把窗口缓存的所有数据交给evictor,evictor处理完之后交给定义的window function。如果window function之后还定义了after evictor,则数据再交给after evictor,最后继续发往下游。
这个流程有2个注意点:
上面讲了,trigger可以决定窗口直接缓存数据,还是触发计算,我们看下具体是如何做到的。Flink中,Trigger抽象类定义了一些方法,其中以下几个是比较重要的:
其中onElement、onEventTime、onProcessingTime决定窗口行为,它们的返回值都是一个TriggerResult,这是一个枚举类型,目前有这么几个枚举值:
Trigger对于窗口的生命周期至关重要,Flink给每个内置窗口都定义了默认的触发器,我们也可以自定义自己的触发器。
文章后面我们在解答Flink的Watermark细节介绍一文中遗留的问题时会分析内置的EventTimeTrigger
,到时候再结合代码看一下上面介绍的这部分内容。接下来我们再介绍一个比较重要的知识点。
现在已经对窗口有了一个比较细致的了解,我们再来讨论下关于迟到数据的处理。默认迟到数据是会被直接丢弃的。以12:00:00.000-12:00:05.000
这个窗口为例,它会在12:00:00.000-12:00:05.000
这个区间的第一个事件到来时被创建,然后默认(在EventTimeTrigger
中定义)在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,并在watermark值超过12:00:05.000之后将窗口移除。但如果我们通过allowedLateness设置允许1秒钟的延迟,会发生什么?
以前文章说过,允许延迟实际是延长了窗口的生命周期,允许1秒延迟相当于把窗口的生命周期变成了12:00:00.000-12:00:06.000
。但上面介绍的不允许延迟的情况下的流程基本不变,依旧会按照上面的逻辑在时间范围内第一个事件到来时创建窗口,在watermark值超过12:00:05前1毫秒的时间(即12:00:04.999)触发计算,但只有当watermark值超过12:00:06.000才会移除窗口。也就是如果属于12:00:00.000-12:00:05.000
窗口的数据迟到了(即在12:00:05.000之前没能够来 ),但要是能够在12:00:06.000之前来,就会重新触发一次窗口计算,且来一个事件就会触发一次窗口计算。这里可以看到,允许数据迟到之后,迟到范围内到的数据依旧属于原来的窗口,而不是下一个窗口。另外就是每次触发窗口计算都会往下游发送一个数据,这样多次触发,就会往下游发送多次数据。所以允许迟到数据一方面如前文所说会影响处理的实时性,增加资源消耗,另一方面也可能会产生重复数据,需要下游能够正确处理这种情况。
最后关于窗口的生命周期需要注意的是,基于时间的窗口生命周期完结之后,Flink会负责移除,但像内置的Global Windows不属于时间窗口,Flink不会去移除这种窗口,需要用户自己实现。
基于时间的窗口,一个定义的时间范围内会产生多少个窗口?答案是:1到N个,看情况(it depends~~)。看哪些情况呢:
Flink的Window底层是State,目前State支持三种后端:
不管需用哪种状态,只需要在env那里配置一下,对于窗口以及State的使用都是透明的。DataFlow模型能做到流批统一的一个原因就是它能支撑超大量级的状态。可以看到状态数据最终可以存储在HDFS等分布式文件系统上,这样理论上MapReduce这种离线计算能搞定的,Flink都能做,且机制更加灵活。
讲了这么多理论,看个具体的例子吧(源文件见这里),场景还是上面说的分析每5秒钟各种行为的个数:
package com.niyanchun.window;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
/**
* A window demo.
*
* @author NiYanchun
**/
public class WindowDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new CustomSource())
.keyBy(f -> f.getString("action"))
.timeWindow(Time.seconds(5))
.process(new CustomProcessFunction())
.print();
env.execute();
}
public static class CustomSource extends RichSourceFunction<JSONObject> {
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
System.out.println("event in source:");
getOrderedEvents().forEach(e -> {
System.out.println(e);
long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
ctx.collectWithTimestamp(e, timestampInMills);
ctx.emitWatermark(new Watermark(timestampInMills));
});
System.out.println();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
/**
* generate out of order events
*
* @return List<JSONObject>
*/
private static List<JSONObject> getOrderedEvents() {
// 2020-05-24 12:00:00
JSONObject event1 = new JSONObject().fluentPut("id", "event1").fluentPut("action", "pv")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
// 2020-05-24 12:00:01
JSONObject event2 = new JSONObject().fluentPut("id", "event2").fluentPut("action", "cart")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
// 2020-05-24 12:00:03
JSONObject event3 = new JSONObject().fluentPut("id", "event3").fluentPut("action", "buy")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
// 2020-05-24 12:00:04
JSONObject event4 = new JSONObject().fluentPut("id", "event4").fluentPut("action", "pv")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
// 2020-05-24 12:00:05
JSONObject event5 = new JSONObject().fluentPut("id", "event5").fluentPut("action", "pv")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
// 2020-05-24 12:00:06
JSONObject event6 = new JSONObject().fluentPut("id", "event6").fluentPut("action", "cart")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
// 2020-05-24 12:00:07
JSONObject event7 = new JSONObject().fluentPut("id", "event7").fluentPut("action", "buy")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
// 2020-05-24 12:00:08
JSONObject event8 = new JSONObject().fluentPut("id", "event8").fluentPut("action", "buy")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
// 2020-05-24 12:00:09
JSONObject event9 = new JSONObject().fluentPut("id", "event9").fluentPut("action", "pv")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
// 这里把消息打乱,模拟实际中的消息乱序
// 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
// 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
}
public static class CustomProcessFunction extends ProcessWindowFunction<JSONObject, Object, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
TimeWindow window = context.window();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int count = 0;
for (JSONObject ignored : elements) {
count++;
}
System.out.println(sdf.format(window.getStart()) + "-" + sdf.format(window.getEnd()) + ": " + count + " " + s);
}
}
public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private CustomEventTimeTrigger() {
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
"; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
"; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
"; time:" + sdf.format(time));
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static CustomEventTimeTrigger create() {
return new CustomEventTimeTrigger();
}
}
public static class CustomAggregation implements AggregateFunction<JSONObject, Object, Object> {
@Override
public Object createAccumulator() {
return null;
}
@Override
public Object add(JSONObject value, Object accumulator) {
return null;
}
@Override
public Object getResult(Object accumulator) {
return null;
}
@Override
public Object merge(Object a, Object b) {
return null;
}
}
}
代码内容不说了,和上篇文章的类似,只是把原来基于Non-Keyed Windows的timeWindowAll换成了基于Keyed Windows的timeWindow,所以多了一个keyBy的操作。代码执行结果如下:
event in source:
{"action":"pv","id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"action":"cart","id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"action":"pv","id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"action":"pv","id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"action":"buy","id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"action":"buy","id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"action":"cart","id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"action":"buy","id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"action":"pv","id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
2020-05-24 12:00:00-2020-05-24 12:00:05: 1 cart
2020-05-24 12:00:00-2020-05-24 12:00:05: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 pv
2020-05-24 12:00:05-2020-05-24 12:00:10: 2 buy
2020-05-24 12:00:05-2020-05-24 12:00:10: 1 cart
Process finished with exit code 0
前面是Source那里发出的乱序事件,后面是窗口计算的结果。简要分析一下:event3乱序了,被丢掉了,所以12:00:00-12:00:05
这个window只收到event1、event2、event4三个时间,2个pv事件,1个加购物车事件,根据前面介绍的keyBy操作会产生2个窗口,所以打印了两个12:00:00-12:00:05,分别对应于cart和pv(最后面列就是window的key)。12:00:05-12:00:10这个时间段类似。
至此,理论篇也差不多了,这部分主要是对我认为官方文档没有太展开的一些细节部分进行了补充讲解。下一部分是以Flink内置的EventTimeTrigger
触发器实现来介绍一下触发器的细节,顺便回答一下Flink的Watermark细节介绍文章中遗留的一个问题:为什么有的乱序丢了,有的没丢?
Flink的Watermark细节介绍一文中我们使用的是基于Event Time的Tumbling Window,它使用的默认触发器是EventTimeTrigger
,这个是Flink内置的,为了加一些打印语句,我完全Copy了一份,重新定义了一个自己的触发器CustomEventTimeTrigger
,完整程序代码前面文章已经有了,这里为了方便看触发器,仅贴一下触发器部分的代码(完整代码见这里):
public static class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private CustomEventTimeTrigger() {
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("onElement -- event: " + ((JSONObject) element).getString("id") +
"; window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
"; ctx.getCurrentWatermark():" + sdf.format(ctx.getCurrentWatermark()));
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("onEventTime-- window.maxTimestamp():" + sdf.format(window.maxTimestamp()) +
"; time:" + sdf.format(time));
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static CustomEventTimeTrigger create() {
return new CustomEventTimeTrigger();
}
}
上面代码仅是在原来的EventTimeTrigger
上面加了一些输出,逻辑完全没有变更。可以看到这个触发器主要实现了onElement
和onEventTime
两个方法,也就是在使用Event Time的时候,窗口触发主要通过这两个方法实现。我们先看下程序输出:
event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
onElement -- event: event1; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():292269055-12-03 00:47:04.192
onElement -- event: event2; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:00.000
onElement -- event: event4; window.maxTimestamp():2020-05-24 12:00:04.999; ctx.getCurrentWatermark():2020-05-24 12:00:01.000
onElement -- event: event5; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:04.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:04.999; time:2020-05-24 12:00:04.999
window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3
onElement -- event: event7; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:05.000
onElement -- event: event6; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event8; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:07.000
onElement -- event: event9; window.maxTimestamp():2020-05-24 12:00:09.999; ctx.getCurrentWatermark():2020-05-24 12:00:08.000
onEventTime-- window.maxTimestamp():2020-05-24 12:00:09.999; time:2020-05-24 12:00:09.999
window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5
Process finished with exit code 0
这里解释几个时间:
[start_time, end_time)
,时间格式都是从1970-01-01T00:00:00Z至今的毫秒数。而这里的window.maxTimestamp()就是这个end_time减1(因为是左闭右开的).long timestamp
是指当前事件(即T element)到达的时间。long time
就是前文说的定时器触发的时间,实际值和window.maxTimestamp()
一样,也就是这两个值是时间窗口生命周期结束的时间值。根据上面的输出也可以看出来,然后结合这里代码实现就得出了前文给出的结论:对于窗口而言,它只看属于这个窗口的数据是否在[start_time, end_time)
这个时间范围内来,如果来了,就没迟到,就是“有序的”,所以它保证的是当前时间范围窗口和下一个时间范围窗口的有序,而不是一个时间范围窗口内时间的有序性,而且Flink的确并不保证一个窗口内的事件顺序(原文是:Flink provides no guarantees about the order of the elements within a window.)。也就是一个窗口内event1是比event2早到,但传给窗口计算函数(process那种)的时候,并不保证event1一定在event2之前。
本文到这里也就结束了,只有理解了窗口的机制,才能更好的理解watermark是为了解决什么问题。但请记住,只有当使用基于Event Time的窗口时,Watermark才有意义。
]]>