说到流处理,很多后端工程师,特别是大数据工程师应该都不陌生,Storm、Spark以及如日中天的Flink应该或多或少的听过或者用过。本文从下面几个方面简单聊聊流处理:

  • 流处理发展史
  • 流处理中的一些重要思想和设计
  • 一些流处理框架的简单对比(Storm、Spark、Flink、Google Cloud Dataflow、Kafka Stream)

注:本文说的Spark都指其流式部分,即Spark (Structured) Streaming.

流处理发展史

第一阶段:批处理。大数据处理的最早期是批量计算,代表性的就是Google提出的MapReduce计算模型。但批量计算有个致命的缺点就是延迟很高。随着业务的发展,实时计算的需求越来越迫切,必须有某种方式能够解决批量计算的高延迟。于是第二阶段应运而生(当然批处理不算流处理,这里把它归为第一阶段主要是为了前后延续性)。

第二阶段:Lambda架构。面对批处理的高延迟,Twitter的Nathan Marz提出了Lambda架构,所谓的Lambda架构简单说就是同时运行一套批处理系统和一套流处理系统,批处理系统提供准确性、一致性计算;流处理系统提供实时计算,最终将两套系统的计算结果进行合并。典型的架构图如下:

Lambda架构

图中上面运行了Hadoop,用于批处理;下面是Storm,用于流处理。顺便提一下,Nathan Marz也是Storm的作者。这套系统看似非常完美,但随着使用的增多,也暴露出了很多弊端,最大的问题在于系统的复杂性:

  • 开发复杂性:同一套业务,需要分别为批处理系统和流处理系统各开发一套,且后面还要适时合并,对于开发人员,要同时学习两套系统的开发,或者请两拨人。
  • 运维复杂性:同时维护两套系统。

怎么才能解决这个问题呢?聪明的工程师们自然是会想出办法的。

第三阶段:Spark 1.x。尽管现在看来,Spark 1.x很简陋,但在当时是有划时代意义的,它第一次将流批统一起来,这样用一套系统可以同时实现流处理和批处理,不论是开发还是运维都简单了许多。但这个时候,有个牛逼公司(你猜猜是哪个)的广告业务需要计算用户的点击量等指标,他们对准确性和实时性要求都很高。Lambda架构可以满足要求,但他们觉得太复杂;而Spark 1.x不支持Event Time(后面会说到),而且在准确性方面还不够完善。于是,他们认为是时候重新造轮子了。

第四阶段:Google Cloud DataFlow。没错,刚才说的牛逼公司就是Google,既然已有的系统都不能解决问题(其实当时有一些还不太成熟的流处理系统),那就创造一个,于是Google提出了DataFlow流处理模型(也称Beam Model)。这个模型提出了如何设计一套“完美的”流处理系统,解决目前流处理系统不能解决的问题。具体内容下面会介绍。DataFlow的提出有这么几个重要的影响:

  • 诞生了Google的流处理系统Google Cloud DataFlow
  • Beam的开源
  • Spark 2.x的诞生
  • 对Flink产生了深远影响

为什么现在Flink现在非常流行?可以认为Flink是目前对DataFlow模型实现最完全的开源框架,Spark 2.x虽然也借鉴了DataFlow的思想,但其底层微批(micro-batch)模型和依靠血缘关系进行容错的设计使得其很难实现DataFlow的一些特性。所以现在Spark在把底层完全推到重新按照Flink中的快照模型进行设计,也就是Spark 3.0,但目前还处于snapshot阶段。可以认为Google DataFlow就是现代流处理的基石,而Flink算是目前开源领域最完善的现代流处理引擎。而Beam它自身不是一个流处理框架,而是Google开源的一个统一的编程框架,可以理解为就是一个SDK,这个SDK支持多种语言(比如Java、Python等),然后用这个SDK开发的程序可以在Google DataFlow、Flink、Spark等一些流处理框架上执行。目前也是一个非常火的项目。

最后不得不感慨,Google真的是一个非常厉害的公司,看看现在的大数据领域、容器领域、AI领域,主流的技术都是Google提出,并且已经用了很多年的技术。

DataFlow中的一些重要思想和设计

下面介绍一些现代流处理框架中的一些重要的思想和设计。

状态(State)的支持

在Storm中,如果你想做一些有状态的计算,必须引入第三方的缓存(比如Redis),然后自己实现状态的维护与处理,这个要做到完善和通用化是一件比较复杂的事情。所以早期的流处理主要用于无状态的处理,或者一些简单的状态计算。DataFlow中把状态的支持作为流处理框架的基本功能,对开发者基本是开箱即用,而且这个状态的量级可以很大(TB级甚至更大都是OK的)。不仅如此,还在状态之上设计了Window,相当于对底层的状态操作,Window提供了更丰富的功能和接口。比如基于时间的时间窗口、触发器(Trigger)、迟到数据的处理等,极大的简化了状态的处理。这里用Flink举个例子:

sourceStream.keyBy(t -> t.getString("user_id"))
    .timeWindow(Time.seconds(5))
    .allowedLateness(Time.seconds(2))
    .sideOutputLateData(lateOutputTag)
    .trigger(new Trigger<JSONObject, TimeWindow>() {
        @Override
        public TriggerResult onElement(JSONObject element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        return null;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return null;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return null;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

        }
    })
    .process(new ProcessWindowFunction<JSONObject, Object, String, TimeWindow>() {
        @Override
        public void process(String s, Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
        // 窗口逻辑
        }
    });

为了看得清楚,先把上面的实现部分去掉,保留主干代码:

sourceStream.keyBy(t -> t.getString("user_id"))
    .timeWindow(Time.seconds(5))
    .allowedLateness(Time.seconds(2))
    .sideOutputLateData(lateOutputTag)
    .trigger(可以自定义触发器)
    .process(具体的处理逻辑);

sourceStream是输入流(即Source),可以把它想成是从Kafka读入用户数据。然后上面的代码依次做了如下操作:

  • keyBy:根据用户ID(user_id)做哈希,这样后面每个窗口就只会包含同一个用户的数据了。
  • timeWindow:Flink根据常用的场景,定义了好几种Window(有兴趣的看这里),这里的timeWindow相当于创建了一个5秒的Tumbling Window。Tumbling Window也是最简单的基于时间的窗口,比如上面的5秒的Tumbling Window就是每5秒会计算一次窗口内的数据。有个细节需要注意就是这个必须是“整点”。比如建立第一个窗口的时候是15:03:02,那15:03:05这个窗口就会触发,然后以后就是15:03:10,15:03:15触发。
  • allowedLateness:这个表示允许数据迟到2秒。本来窗口在15:03:05触发计算之后就销毁了,但如果配置了允许迟到2秒,那这个窗口就会多保存2秒,也就是会保存到15:03:07,在这个时间之前如果还有数据到来,就会重新触发一次计算。关于迟到涉及到Event Time,待会介绍Event Time的时候再说。
  • sideOutputLateData:默认迟到的数据会被直接丢弃,实际中数据显然是不能随便丢掉了,可以通过这个方法将迟到的数据旁路出去,然后进行单独的处理。
  • trigger:窗口触发的逻辑。默认对于时间窗口,时间到了就会触发。用户可以根据自己的需求实现自己的触发器。比如每多少个事件就触发,每隔多长时间触发,或者多种条件的组合等。
  • process:窗口触发以后,如何处理窗口中积攒的事件,一般业务逻辑都在这里实现。这里还支持Reduce、Fold、Aggregate等更高效的迭代方法。

上面的这些操作并非全需要指定,一部分有默认值,这里全部列出仅是为了说明。

Event Time

流处理里面有2个比较重要的时间:Processing TimeEvent Time,Flink里面还有一个Ingestion Time,算是介于Processing Time和Event Time之间的一个折中的时间吧。这三个时间概念上非常好理解,Processing Time是数据在流处理引擎中被处理时的系统时间;Event Time就是事件真正产生的时间;Ingestion Time就是数据进入流处理引擎的时间,一般取在Source阶段的系统时间。一个数据从进入流处理系统可能会经过多个阶段的计算,如果是使用Processing Time,则一个事件在不同阶段的计算时间也是不一样的;但如果是Ingestion Time,则事件一进来这个时间就定了,后面都用这个时间。所以Ingestion Time就像是把数据进入流处理引擎的时间作为Event Time了。最后附一个Flink官方文档中的图:

YzPR5d.png

当然,单纯看这个时间概念是没多大意义的,时间这个概念也只有在使用和时间有关的算子(time-based operations)时或者说涉及到基于时间的状态的时候才有意义,对于其它算子(比如map、filter),不论你取哪个时间,都是没有区别的,因为它们的处理根本就不会用到时间。和时间有关的算子主要就是Window了,目前绝大多数窗口都是基于时间维度去定义的,Flink目前定义的几种窗口中只有Global window不是基于时间的。

时间这个问题其实涉及的是分布式系统中如何确保事件有序的问题,这个问题本身就是分布式中的一大难题。比如我要分析用户当天的一个行为,要保证这个计算准确性,那就必须在分析阶段拿到用户当天的所有事件。但这个是有一定难度的,用户当天产生的事件自身肯定是有时间上的先后顺序及节点的,但等计算框架拿到这些数据的时候就未必有序了,你不能认为拿到了23:59:59秒的数据就认为已经拿到了用户当天所有的数据了,因为各种原因部分数据可能延迟了(如果产生事件的是手机App,那手机关机了或者被调整为飞行模式了;又或者有处理延迟等)、乱序了等等。延迟+乱序是现实中很常见的,这个基本上是避免不了了,也就是你无法从技术上杜绝数据不延迟和不乱序。这里说的乱序和延迟都是从时间维度而言的。

如果选用Processing Time,就不存在延迟和乱序的问题了,因为你采用的时间是数据被处理的时间,这个肯定是一直往前走的。也就是当我们采用Processing Time的时候,也就代表我们并不在意事件自身产生的时间和顺序了,这就很简单了,所以Spark 1.x以及早期的流处理系统都支持。但现实往往是我们在意事件产生的时间,很多业务分析必须基于事件产生时间,即Event Time,而非Processing Time。比如你分析业务高峰期,如果不用事件时间,而用处理时间,那分析出来的高峰期很可能是比实际高峰期延后的。所以后来的流处理系统都支持了Event Time,但这件事情要做好还是蛮难的。下面举个例子,这个例子中我们采用Event Time进行计算。

假设事件产生的顺序是:t1, t2, t3, t4, t5, t6, t7, 而等到处理引擎拿到的时候顺序变成了 t1, t3, t4, t5, t6, t2, t7。如果设置窗口大小为5的话,那窗口收到t1, t3, t4, t5的时候就该触发计算了,因为t5已经到了。但从上帝视角我们知道本来属于这个窗口的事件t2还没到,不应该开始计算,否则结果必然不准确。但处理引擎没有上帝视角,它并不知道。为了应对这种迟到的数据,处理引擎只能提供机制,一定程度的缓解这种问题,然后让用户做选择。这里说下Flink中的机制:默认对于迟到数据,会直接丢掉。也就是收到t5时认为后面不会再有t5之前的数据了,如果真的来了,那就丢掉,因为之前的窗口已经销毁了。同时提供了allowedLateness机制(上面示例代码中有),用户可以选择额外多等一些时间,如果在这个时间限内,数据能来,那还可以处理。这个代价就是窗口真正销毁时间变成了“窗口大小+延迟时间”,数据缓存是有代价的,所以这个延迟的时间是不能无限制放大的,否则资源会被耗尽。另外还提供了sideOutputLateData,也就是所有迟到的数据都丢到这个流里面,单独做处理。也就是实际中我们尽量保证窗口大小+延迟时间这个范围能处理绝大部分数据(正常的数据和大部分迟到的数据),对于极少量的迟到很长时间的数据可以输出到单独的流中做处理。

有一个点需要注意的是上面因为数据迟到造成的数据不准确并不是流处理自身的缺陷,批处理也存在这个问题。比如每天凌晨2点处理前一天的所有数据,如果前一天的部分数据因为延迟启动任务的时候还没有来,那也会造成数据的不准确。反而流处理引擎提供了更多更灵活的机制来处理这部分数据。

Watermark

先插播一个容易混淆的概念性问题:

流处理 & 实时处理 & 批处理 & 离线处理

这些概念很容易混淆,最容易搞混的就是把流处理和实时处理等价起来。流处理更多的描述数据是源源不断产生的,即无界数据。但这个处理未必是实时的,特别是对于那些支持状态的现代流处理框架,完全支持天粒度甚至更长时间粒度的计算,很显然,这已经不是实时处理了。而批处理强调的是处理方式,积攒一批数据一起进行处理,以获取比较好的吞吐量。比如Spark Streaming就是这种方式,但因为它的批很小(micro-batch),所以更多的被认为是一种流处理;同时MapReduce也是批处理。像Flink等事件驱动(一个event就会触发计算)的计算框架就不属于批处理了。离线处理是相对于在线而言的,它强调的是数据处理的时效性。这些概念是从不同维度去描述计算方式的,而且往往在一些维度是有重叠的。其实绝大多数时候也没有必要特别学院派,一定要明确的给出各种术语的概念,但至少不能用错。

上面讲Event Time的时候提到了一句“收到t5时认为后面不会再有t5之前的数据了”,可见t5这个时间点非常重要。对于Processing Time,时间本来就是单调递增的,永远不会产生迟到数据,因为每个事件的时间都是取他被处理时候的系统时间。如果取Event Time,这个t5的选择就很关键了,因为一旦确定之后,早于这个时间点的数据都会被认为是迟到数据,默认迟到数据是会被丢弃的。为了解决这个问题,DataFlow提出了Watermark这个概念。

可以将Watermark理解为一个函数:$F(P) -> E$,它的输入是当前的系统时间,输出是一个Event Time(一个时间戳),而且输出的这个时间戳是严格单调递增的。流处理系统规定,如果某个时刻Watermark的值为E1,那系统就认为凡是早于E1的事件都已经收到了。

所以我们可以看到Watermark其实就是一个时间戳,它解决了使用Event Time时数据完整性(completeness)的问题。当然这个解决仅是从设计、机制上解决,具体到实际问题能解决到什么程度还得看具体的Watermark如何计算。而且这个计算没有一个特别好的通用解决方案,它是和具体数据、业务相关的,所以流处理框架一般会内置一些Watermark计算方式,也提供了用户自定义Watermark计算方式的接口,让用户可以结合自己的业务去实现适合自己的Watermark。

最后关于Watermark的点就是如何传播。其实每个算子都可以在自己阶段重新计算Watermark,但一般不会这样做,那样的计算量比较大,很多时候都是source阶段计算好,后面往下传播即可,可以简单理解为在正常的事件中间插入了特殊的Watermark事件往下传播。如果有多个source,并行处理,数据shuffle之后,一个算子就有可能收到多个Watermark,Flink里面采用的策略是取最小的Watermark值作为当前算子的Watermark。

流批统一

介绍了上面这些概念,我们再来谈一下流批统一。现代的流处理框架都宣称做到了流批统一,这个该如何理解?

这个流批统一是相对Lambda架构而言的,前文提到了Lambda架构最大的问题就是复杂性,原因就在于它同时使用了两套系统去实现实时流处理和离线批处理。而基于DataFlow模型的流处理框架它们在设计上就把批处理当成了一种特殊的流处理:批处理就是一种有界的流处理。因为支持State,而且量级可以很大,所以使得流处理里面可以缓存长时间很大量的数据,这样批处理能做的,流处理完全能做。当然实际实现中,每个框架设计可能不一样,比如Spark底层实际是微批处理(micro-batch),流只是微批而已;而像Flink 1.9之前,虽然对开发者暴露的接口是统一的(部分接口在细节上流和批还是有差别的),但底层也完全是两条路,流是流的处理,批是批的处理,1.9及之后把blink合并以后,底层也慢慢的统一起来,终极目标是不光对用户统一,底层也做到统一。但不管哪种实现,至少在软件层次已经统一了,也就是你为了同时实现流处理和批处理,不用再去维护和学习两个框架了,一个Spark或Flink就够了,而且像Flink,流和批的API很多都是一样的(终极目标是全部一致),也就是你开发一个功能的时候,不论是以批的方式处理还是以流的方式处理,代码是一样的。当然现在还无法完全一样,但以后的目标是做成完全一样的。那也是真正实现流批统一(批只是流的一种特殊形式,无需特殊对待)的时候了。

当然,话说回来,任何事情都是有利有弊的。要做到流批统一,那做功能设计的时候考虑的因素就要多一些,特别是在做优化的时候。所以Google虽然有流批统一的Google Cloud DataFlow,但内部一些批处理业务仍然在使用批处理引擎,因为它们已经单独针对批处理做过很多优化了。这可能也是现在Lambda架构仍然有很多的原因,虽然复杂,但业务发展到一定程度,架构自身的复杂性可能已经不是最复杂的了,没有必要花大力气去切换。这又回到了架构设计的问题,脱离业务谈架构就是耍流氓。最好的架构是演进出来的,而不是设计出来的。

容错

容错就是发生故障(可以是任何故障,小到网络抖动导致某次处理超时,大到整个集群物理机集体断电)后还能正确处理或恢复,分布式里面目前是不存在完美的容错方案的,也就是CAP理论至今依旧是成立的,各个分布式组件实现的容错方案仅是做了不同的取舍或者其承诺的容错也是有前提条件的。这部分内容太多也太深了,这里不会展开说,推荐知乎上的一篇文章,有兴趣的可点击查看。本文仅从宏观角度对比一下常见流处理系统的容错方式。总的来说关于数据处理的准确性,一般有这么几种:

  • at-least once:保证至少处理一次,即数据是可能重复的。
  • at-most once:最多处理一次,即不保证所有数据都参与处理了,可能会丢数据
  • exactly once:保证只处理一次

流处理里面的容错只是要实现Exactly Once这种级别。首先故障时的容错需要考虑这么几个问题:

  1. 故障后重算时知道从哪里重算。
  2. 仅从哪里重算往往是不够的,因为同样的数据执行同样的逻辑计算多次的结果未必是一样的,比如你的source每次获取数据都不一定一样;又或者计算逻辑里面使用了当前时间、随机数等行为的,这些还比较明显,能考虑到,但有一些就不那么明显,而且甚至是无法控制的。比如有个计算用到了窗口,那可能第一次计算的时候,某个数据迟到了,但等你重算的时候,这个数据可能已经来了,就不属于迟到数据了,这种不确定性很隐晦,而且不可控制。像这种相同输入相同逻辑执行多次,结果可能不同的计算称为非确定性计算(nondeterministic-compute);相反结果确定的就是确定性计算,对于确定性计算,利用幂等操作往往可以比较简单的实现Exactly-Once Processing。
  3. 承诺的Exactly-Once Processing范围。是端到端的(即从外部Source到流处理引擎,再到输出Sink)全链路还是仅包含流处理引擎内部?目前没有任何一个流处理系统能做到对于任何Source和Sink都做到端到端Exactly-Once Processing,因为和第三方的系统通信有太多的不可控因素。目前通常对于如果一个Source的元素有唯一键(比如Kafka的event的partition+offset就可以唯一标识一个event),且Sink支持事务操作的情况下,一些流处理引擎可以做到端到端的Exactly-Once Processing。

目前Spark、Flink、Google DataFlow、Kafka Streams都号称是支持Exactly-Once的(虽然Storm通过高层API可以实现exactly once的处理,但其底层仅提供at-least once的保证,所以这里就不参与对比了),我们简单对比介绍一下。大的可以分为重算型记录结果两大类:

  • 重算型,就是想办法记录故障前处理的数据的位置以及计算方式,一旦发生故障,从故障前的位置开始重算。缺点前面已经说过了,这种无法应对不确定性计算。而且即使是确定性计算,如果最终数据写到sink的时候不是幂等操作,就可能会产生重复数据。
  • 记录结果型的。就是每次一个event计算完,先将计算结果存储下来(一般存储到高可用存储),然后才发往下一个算子计算,这样,如果发生故障,已经计算过的不用重算,直接取之前的计算结果即可,这种可以应对非确定性计算,因为不会重复计算。

Spark属于重算型的,它会记录计算的血缘关系图,当发生故障的时候,根据血缘关系图重新计算。当计算结果最终成功写到sink之后,就会从血缘图中删掉这部分的关系图,防止关系图无限变大。另外,为了减小故障后重算的计算量,spark也会缓存部分计算结果。但其目的并不是走记录结果型的路线,仅是为了减少计算量而已。所以Spark的Exactly-Once保证是假设所有计算都是确定性计算且是幂等的。

Google DataFlow和Kafka Streams都属于记录结果型的,它们的具体实现虽然不同,但都是先记录计算结果到高可用存储(DataFlow是记录到BigTable,Kafka Stream是记录到Kafka),而且写这个动作要是原子性的,不然就又涉及到写入的时候故障了,你无法确定是写成功还是没成功的问题,再次写入可能会重复(如果不是幂等的情况),也就成个死循环了。所以Google BigTable(对应开源界的HBASE)和Kafka都是支持原子写/事务写的。这里有一个经验主义误区会觉得每个event都写盘性能会很差,但实际并非这样。一方面从算法上会进行很多优化,比如Google DataFlow就利用图优化和Bloom Filters减少了很多不必要的写操作。另一方面,写操作慢一般都指的是随机读写慢,连续读写的速度并不慢,这也是Kafka虽然写磁盘,但依旧很快的一个原因。有评测,SSD的连续写比内存随机写更快(具体出处忘记了,好像也是一个国际acm机构给出的数据)。

Flink则采用一种分布式快照的算法(原型是Chandy-Lamport算法,有做一些改进),简单理解就是它会在正常的数据中放入一些特殊的事件,这个事件称之为barrier,flink保证在任何一个算子那里,只有当某个barrier之前的数据全部接收并处理完了,才会将该barrier分发到下一个算子,这样当最终该barrier之前的所有数据都成功写入到Sink后,那就说明这个barrier之前的所有数据已经成功处理了,认为这个时候就是系统达到了一个全局稳定点,然后做一个快照,并把这个快照存储到高可用存储上。后面故障恢复时,从多个备份的快照中找到最近的一次全局稳定点重新开始计算。Spark现在也在重写底层,使用Chandy-Lamport算法来代替原来的血缘关系图。

各个方案都有各自的优缺点,比如Flink这种全局快照优点是Checkpoint的代价比较小,而且可以纯异步,不影响正常处理;但弊端是恢复比较慢,集群规模大了以后,这个问题会更突出。针对这个问题,它的考虑点是故障的场景毕竟还是少数。容错这部分比较复杂,细节也很多,以后单独来介绍吧。

总结

以前人们普遍认为流处理在正确性和一致性方面存在问题,DataFlow的设计者们认为这是以前流处理引擎设计的缺陷,而非流处理自身的问题,并设计初DataFlow模型解决了这些问题,还将批处理作为流处理的一种特殊形式对待,将流批统一起来。目前商业的Google Cloud DataFlow,开源的Flink,以及Apache Beam的火热发展也在证明DataFlow模型是成功的、正确的,是未来的趋势。

另外,DataFlow还提出了如何将流处理当成Table,然后使用SQL去处理的新思路(当然底层和RDBMS是完全两个东西),这个也是Blink对Flink改动非常多的一个点,好在现在Blink也已经基本都合并到Flink里面了,普通用户也能使用到很多阿里做的优化了。

也许,以后慢慢就只会有流处理引擎了。让我们拭目以待吧!

如果你对流处理有兴趣,强烈安利一本书:《Streaming Systems》.