最近Flink有个新特性FLIP-50提交到社区,并且已经被社区接受了,这个特性名称为:FLIP-50: Spill-able Heap Keyed State Backend。是的,这关于state的一个特性。在流处理中,如果处理流程中涉及状态,那就需要一种机制能够缓存这个状态的数据,比如Window算子就是流式框架中常见的一种包含状态的算子。举个业务上的例子,比如我们要统计5分钟内耗时最长的交易,那一种做法就是缓存5分钟的数据,然后直接对耗时字段求max即可。既然要保留状态(即缓存数据),那就涉及到存储,FLIP-50是对Flink状态存储机制进行增强的一个特性,在看这个特性之前,我们先来简单看一下Flink已经提供的状态后端(State Backend)存储的支持。

目前Flink最新的版本就是前几天发布的1.9.0版本,截止到此版本,Flink给开发者提供了三种存储状态的后端,直接看图(此图来自Flink母公司ververica官网,当然该公司年初已经被阿里收购,公司名字也由原来的Data Artisans改为现在的ververica):

state-backends

对图稍作解释。从后端存储类型粗分的话,Flink目前支持两大类State Backend:

  1. RocksDBStateBackend:这种机制存储状态时采用的是RocksDB,一种嵌入式数据库。因为数据是存储在磁盘上的,所以理论上能存储的状态大小只受限于磁盘容量,这是它的优势。但劣势在于一方面写磁盘就慢(即使有Cache,但也需要定期flush到磁盘),另一方面,状态的实质也是Java对象,所以写入和后面读都要做序列化和反序列化,这也是一笔不小的开销,所以,这种机制的劣势就是性能差。
  2. 基于内存(Java Heap)的状态后端(HeapKeyedStateBackend):FsStateBackendMemoryStateBackend。这两个的当前状态都是存储在(TaskManager的)内存里面的,所以它们能存储的大小受限于TM的Heap限制,必然不能存储太大的状态。

Flink会定期的对state做快照(snapshot),以在故障的时候从快照进行恢复。RocksDBStateBackendFsStateBackend将快照存储在配置的文件系统上面(一般为了高可用,配置为分布式文件系统);而MemoryStateBackend则是将快照数据存储在JobManager的内存(Heap)里面,所以MemoryStateBackend能存储的状态数据就更小了,受限于JobManager的内存。实际中JobManager只有一个(高可用模式下还会有若干个standby),但TaskManger却是可以一直扩展的,所有的TaskManger的状态数据都放在JobManager的内存里面,可想而知,能存储的状态数据非常非常有限。

另外,为了让snapshot的操作不阻塞业务,上述三种State Backend默认都是异步做快照的。而且目前只有RocksDBStateBackend支持增量快照,即增量的状态数据备份。所以,结合能存储的状态数据的大小,以及snapshot的代价,一般业务场景中如果涉及存储的状态比较大,基本上只能选择RocksDBStateBackend了;如果不是非常大,也可以考虑FsStateBackend。但MemoryStateBackend基本只能作为测试使用,至少官方是这么推荐的。

但是,RocksDBStateBackend相比于MemoryStateBackend真的是太慢了,很容易成为整个流处理里面的瓶颈。所以之前我们的业务场景里面,我们默认的策略还是选择了MemoryStateBackend,当然带来的风险就在于如果遇到大的状态,程序随时面临OOM的风险。当时我就在想,如果MemoryStateBackend能做到在内存不够用时将数据写入到磁盘就好了,因为大多数时候其实内存配置大一些是能应付的过来的,但遇到偶尔的流量高峰才可能处理不过来。所以昨天看到了这个特性,真的是非常的兴奋。下面我们来介绍一下这个特性。

这个特性理解起来非常的容易,就是增强了一下之前的MemoryStateBackend:状态依旧存储在内存,但如果出现以下条件任意一个,就将内存中最冷的(coldest)数据导出到磁盘:

  • Heap使用超过预设的阈值;
  • GC的停顿(GC pause)时间超过预设的时间(默认2秒)

除了有导出机制,也有导入机制:随着一些状态的删除或者过期,如果内存的使用低于预设的某个阈值,就将磁盘上面的状态数据重新load到内存里面。具体实现的时候,为了不影响已有的MemoryStateBackend,重新创建了一个State Backend模块:SpillableHeapKeyedStateBackend 。该特性的整体设计如下图:

SpillableHeapKeyedStateBackend

如图,主要包含5个模块:

  • KeyGroupSizeAccountingManager:记录状态中每个组(每个组对应一个key,所以称为key group,后简写为KG)使用的heap和off-heap大小;
  • HeapStatusMonitor:监控内存,根据之前介绍的策略在必要的时候触发将数据导出到磁盘或者从磁盘加载回内存的动作(注意这里只是触发);
  • Spill/LoadManager:如上所述,HeapStatusMonitor只负责触发,而真正导出或者加载的任务由Spill/LoadManager完成;
  • MMapManager:使用mmap技术来提高导入导出的性能,所有的mmap操作由MMapManager管理;
  • SpaceAllocator:负责磁盘上各个KG存储空间的分配。

各个模块职责的细节大家可以参阅文章开头给出的FLIP-50的链接。提交这个特性的作者也做了一个性能测试对比,测试条件是使用word-count这样一个任务,磁盘使用SSD,序列化之后的状态数据约为566MB。不同的State Backend,性能结果如下图:

性能对比

可以看出,只要数据能尽量保存在内存里面,那其性能是远远大于RocksDB的(即使后者的数据也全部写到Cache里面)。这里使用的是SSD,如果是普通机械硬盘的话,差距会更大。

而且,更重要的是这个特性其实已经在阿里巴巴内部的生产环境使用了,而且经历了2018年的双11(Single's Day)的洗礼,现在相当于是迁移到开源的Flink主分支来,所以可靠性方面应该是有保证的。5月底的时候该特性本来计划是随着1.9.0版本中发布的,但不知道为什么现在推到1.10.0了。估计是1.9版本改动的东西实在是太多了,没时间了吧。另外后面功能稳定后,社区也计划将该Backend作为默认的Backend,替代原来的MemoryStateBackend。

当然最后要说明的是,这个特性虽然强大,但终究还是对MemoryStateBackend的一种增强,也就是它的使用场景还是限于状态数据正常不是很大,内存足够容纳,只有在流量高峰或少部分时间可能超过内存空间才需要导出到磁盘的情况。但就这样,也能让很多场景获益不少了,毕竟内存还是快,而且避免了序列化和反序列化的过程。