NYC's Blog - 大数据 http://niyanchun.com/category/bigdata/ 大数据 从数仓到数据湖,再到Data LakeHouse http://niyanchun.com/what-is-lakehouse.html 2022-02-12T11:59:31+08:00 Data LakeHouse是一种新型的湖仓一体架构,该架构旨在用一套系统实现原来的数据仓库(Data Warehouse)加数据湖(Data Lake)的功能。数仓、数据湖、LakeHouse的发展演进过程如下图(图片出自Databricks):本文简单梳理一下整个发展演进的过程。Data WarehouseData Warehouse就是我们平时说的数据仓库(简称数仓),数仓最典型的代表就是MPP数据库。最原始的时候,数据量还不是很大,传统的单体数据库可以支撑平时的分析、决策、报表等需求。但随着后来应用的不断增多,数据量也激增,单体DB已经无法承载,于是便出现了数仓这种新型的架构。​数仓一般包含以下元素:Metadata – a guide to what data was located whereA data model – an abstraction of the data found in the data warehouseData lineage – the tale of the origins and transformations of data in the warehouseSummarization – a description of the algorithmic work designed to create the dataKPIs – where are key performance indicators foundETL – enabled application data to be transformed into corporate data数仓的一些特点:数据格式以结构化数据为主,也能很有限的支持一些类似JSON的半结构化数据数据的使用(分析、BI、报表)方式以SQL为主对机器要求较高,一些商用的MPP数据库甚至是定制的一体机我觉得用一个不太准确但易于理解的描述就是数仓像是一个“分布式的数据库”,因为是分布式的,可以扩展节点,所以可以承载的数据量比以前大了。但它的灵魂依旧还是数据库,所以像传统单体DB中的一些特性在数仓中依旧存在,比如事务、模型(表结构)、隔离性等概念。简言之,数仓主要解决了传统单体DB无法承载越来越多的数据量的问题(当然还有一些其它功能)。但随着技术的发展和业务需求的不断产生,数仓也开始暴露出一些问题:无法存储非结构化的文本、图像、视频、音频等越来越常见的数据格式容量上限不够大SQL这种使用数据的形式太单一,有很多局限,从而导致了一些问题。比如现在有些机器学习算法是需要直接访问数据进行迭代,而不是通过SQL;再比如无法高效的ETL,因为需要通过ODBC先读取数据,而不是直接访问原始数据于是,便出现了数据湖。Data Lake数据湖本质上可以看成是一个接近无限容量,且支持任何格式数据的廉价存储系统。像我们熟知的AWS S3、Azure Data Lake Storage (ADLS)、Google Cloud Storage (GCS)、阿里OSS、腾讯的COS等对象存储系统都可以认为是数据湖。数据湖的特点是:接近无限容量理论上支持任何数据格式存储格式开放,最常见的是Apache Parquet和ORC,这样有两方面的好处,一方面可以避免被某一个厂商绑死;另一方面可以很好的与其它产品形成生态存储成本低,普通的硬件即可本来,数据湖的设计初衷是解决数仓容量和数据格式支持的不足,将所有格式的数据全部存储在数据湖里面,然后使用的时候直接使用湖里面的数据进行分析、查询、计算。但真正使用的时候,大家发现数据湖缺失了一些关键特性,导致湖里的数据无法直接使用。概括来说,主要存在三个方面的问题:安全问题(Security)。数据湖中的数据基本都是以文件的形式存放的,这样就无法提供细粒度的访问控制,比如无法提供文件内容级别的控制,只能针对文件、目录等级别进行控制。性能问题(Performance)。因为数据湖的本质是以文件存储系统,所以没有特别针对数据访问进行优化,一旦数据量多了以后,访问性能比较差,比如列出湖中存储的文件这种非常常用的操作在数据多的时候计算量很大。数据质量问题(Quality)。数据湖缺乏数仓中的一些数据管控(governance)和校验(validation)机制,比如Schema,这样数据的质量就无法得到保障。当然,还有其它一些问题,比如不支持事务、原子写、并发等。结果最终数据湖就变成了数据沼泽(“data swamps”):数据都扔进了数据湖,但无法直接使用。当真正需要使用的时候,还是要读出来放到其它地方(比如数仓)进行使用。但鉴于数仓又存在前面提到的问题,所以企业不得不同时维护一个数仓系统和一个数据湖系统,像极了计算领域的Lambda架构。但是同时维护两套系统的成本和复杂性是很高的,于是又出现了Data LakeHouse。Data LakeHouseData LakeHouse一种湖仓一体的新型架构:可以看到,其实就是把互补的两套架构(数仓和数据湖)融合成了一个架构,这样只用维护一套系统,就可以解决所有问题。概括来说LakeHouse架构的主要特点有:开放(Openness)开放文件格式(Open File Formats):使用开放标准的文件格式,比如Apache Parquet、Apache ORC开放API:提供开放、高效的直接访问数据的API,不和特定引擎、厂商绑定支持多种语言:不再局限于SQL一种,还支持各种各样的第三方库、工具、引擎、框架,比如TensorFlow、Pytorch等机器学习支持更好支持各种格式的数据可以通过R/Python库高效的直接访问数据支持DataFrame API支持数据版本,用于审计、回退或者重新实验低成本下更好的性能和可靠性集成了多种性能优化技术,比如缓存、多维汇聚、索引、压缩支持数据校验和治理,提高数据质量支持事务,保证数据一致性的同时提供更好的并发廉价的存储目前可以算得上是LakeHouse的开源系统有:Apache Hudi(Uber开源)、Apache Iceberg(Netflix开源)、Delta Lake(Databricks开源)。其中Delta Lake的这篇论文算是目前对Data LakeHouse架构的一个“标准定义”:Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores。​Data Warehouse、Lake、LakeHouse对比: Data warehouseData lakeData lakehouseData formatClosed, proprietary formatOpen formatOpen formatTypes of dataStructured data, with limited support for semi-structured dataAll types: Structured data, semi-structured data, textual data, unstructured (raw) dataAll types: Structured data, semi-structured data, textual data, unstructured (raw) dataData accessSQL-only, no direct access to fileOpen APIs for direct access to files with SQL, R, Python and other languagesOpen APIs for direct access to files with SQL, R, Python and other languagesReliabilityHigh quality, reliable data with ACID transactionsLow quality, data swampHigh quality, reliable data with ACID transactionsGovernance and securityFine-grained security and governance for row/columnar level for tablesPoor governance as security needs to be applied to filesFine-grained security and governance for row/columnar level for tablesPerformanceHighLowHighScalabilityScaling becomes exponentially more expensiveScales to hold any amount of data at low cost, regardless of typeScales to hold any amount of data at low cost, regardless of typeUse case supportLimited to BI, SQL applications and decision supportLimited to machine learningOne data architecture for BI, SQL and machine learning最后不管是数仓,还是数据湖,亦或是现在的融合架构LakeHouse,都是为了解决不断发展和产生的业务需求而迭代产生的新架构和解决方案,特别是随着AI的发展,Machine Learning技术已经越来越成熟,慢慢已经成为数据分析的主要组成部分,所以现在新的架构在与AI生态的结合方面考虑的越来越多。目前LakeHouse正在快速发展,提供解决方案和一体化平台的商业公司也在逐渐增多,对于我们这些技术人,能不断见证和学习这些优秀的技术,也算是一件幸事和乐趣。​更多信息可参考下面引用部分的文章。​References:What Is a Lakehouse?Delta Lake: High-Performance ACID Table Storage over Cloud Object StoresFrequently Asked Questions About the Data LakehouseEvolution to the Data LakehouseFrequently Asked Questions About the Data Lakehouse Kafka的消费者分区分配策略 http://niyanchun.com/Kafka-consumer-partition-assignor.html 2022-02-06T11:58:00+08:00 本文是《Kafka的Consumer Group Rebalance》一文的补充部分,主要附加介绍一下Kafka内置的几种分区分配策略。Kafka定义了一个消费者组内分区分配的接口ConsumerPartitionAssignor,该接口里面最核心的是assign方法:package org.apache.kafka.clients.consumer; public interface ConsumerPartitionAssignor { GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription); // 省略其它 }该方法的两个参数分别是当前集群信息metadata和本组成员订阅的主题信息groupSubscription,根据这两个信息计算分配方案GroupAssignment。目前Kafka已经实现了几种具体的策略:RangeAssignor(2.8版本中的默认策略)RoundRobinAssignorStickyAssignorCooperativeStickyAssignor完整的UML图如下:下面分别介绍。RangeAssignor该分配算法是逐个topic进行分配的(per-topic basis),对于每个topic:将分区按数值排序,将Consumer按member.id(如果用户指定了group.instance.id,则使用该id作为member.id,否则随机生成)进行字典序排列。用分区数除以消费者个数得出每个Consumer应该分配的分区个数N(不能整除时向上取整),然后依次给每个Consumer一次分配N个分区(最后一个可能不足N个)。比如现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. 假设消费者排序后顺序为C0、C1,先开始分配topic t0,3个分区/2个Consumer等于1.5,向上取整为2,即每个Consumer分配2个分区,于是t0的分配结果为:C0:[t0p0, t0p1]C1:[t0p2]然后再分配topic t1,和上面同理,分配结果如下:C0:[t1p0, t1p1]C1:[t1p2]​所以最终合并后的分配结果为:C0:[t0p0, t0p1, t1p0, t1p1]C1:[t0p2, t1p2]RoundRobinAssignorRound Robin策略和Range的不同之处在于它是将所有topic的分区放在一起进行分配的。具体方式为:先将Consumer按member.id进行排序,将所有分区按数值排序。然后将分区以round robin的方式依次(每次一次)分配给各个Consumer。如果Consumer订阅的topic有差异的话,在分配某个topic的Partition的时候,如果当前Consumer没有订阅该topic,就会跳过该Consumer。举两个例子:例子1:组内Consumer订阅信息相同。假设现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2。将这6个分区以round-robin的方式分配给C0和C1,分配结果为:C0:[t0p0, t0p2, t1p1]C1:[t0p1, t1p0, t1p2]例子2:组内Consumer订阅信息不同。假设有3个Consumer C0、C1、C2;有3个topic t0、t1、t2,3个topic的分区数分别为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。其中C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2,则分配结果为:C0:[t0p0]C1:[t1p0]C2:[t1p1, t2p0, t2p1, t2p2]StickyAssignorStickyAssignor算法在分配时有2个重要的考虑点:尽量让分区分配的比较均衡当发生重新分配的时候(即Rebalance的时候),在保证1的前提下,尽量保持原有的分配方案不变动看2个例子。例子1:假设有3个Consumer C0、C1、C2;有4个主题t0、t1、t2、t3,每个分区有2个分区,即所有分区为:t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1。现在所有Consumer都订阅了4个主题,则分配结果如下:C0:[t0p0, t1p1, t3p0]C1:[t0p1, t2p0, t3p1]C2:[t1p0, t2p1]这个结果和前面RoundRobinAssignor的分配结果是一样的。但当发生重新分配的时候,就不一样了。假设,现在C1挂掉了,需要重新分配。如果是RoundRobinAssignor,重新分配后的结果如下:C0:[t0p0, t1p0, t2p0, t3p0]C2:[t0p1, t1p1, t2p1, t3p1]但如果使用StickyAssignor的话,重新分配后的结果如下:C0:[t0p0, t1p1, t3p0, t2p0]C2:[t1p0, t2p1, t0p1, t3p1]可以看到,StickyAssignor将C1的分区按照Round Robin的方式分配给了C0和C2,在保证均衡的前提下,最大限度的保留了原有分配方案。例子2:假设有3个Consumer C0、C1、C2;有3个主题t0、t1、t2,分区数依次为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。现在C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2。如果使用RoundRobin,前面已经展示过一次了,分配结果为:C0:[t0p0]C1:[t1p0]C2:[t1p1, t2p0, t2p1, t2p2]但如果使用StickyAssignor的话,分配结果为:C0:[t0p0]C1:[t1p0, t1p1]C2:[t2p0, t2p1, t2p2]此时,如果C0挂掉,RoundRobin重新分配后的结果为:C1:[t0p0, t1p1]C2:[t1p0, t2p0, t2p1, t2p2]有3个分区分配没有变化,且分配不均匀。但StickyAssignor重新分配的结果为:C1:[t1p0, t1p1, t0p0]C2:[t2p0, t2p1, t2p2]有5个分区分配没有变化,且分配均匀。CooperativeStickyAssignor该策略具体的分配方式和前面的StickyAssignor是一样的,但有一个重要的区别是该策略会使用Cooperative Rebalance,而StickyAssignor使用的则是Eager Rebalance,这两种Rebalance的区别参见我之前的文章,这里不再赘述。对比总结就通用场景而言,进行分区分配的时候,一方面我们比较关注分配的均衡性;另一方面也会比较关注当发生Consumer Group Rebalance的时候,能否最大限度的保持原有的分配。从这两个角度来看:关于均衡:RangeAssignor和RoundRobinAssignor的分配是否均衡,主要取决于组内Consumer订阅的主题情况以及每个主题的分区个数。而StickyAssignor和CooperativeAssignor则不太依赖这个条件,相当于在任何情况下,都能实现一个相对均衡的分配方案。当发生Rebalance的时候最大限度保持原有分配方案:RangeAssignor和RoundRobinAssignor算法自身其实完全没有考虑这点,要实现这个功能点,需要结合static membership特性来实现,即指定group.instance.id,这样相当于确定了Consumer的顺序,只要组内Consumer不变、订阅信息不变,就能有一个稳定的分配结果。而StickyAssignor和CooperativeAssignor则考虑了这点,但有一个注意点就是对于StickyAssignor,虽然会尽量保留原有的分配方案,但因为使用的是Eager Rebalance,所以在Rebalance的时候还是会回收所有分区,而CooperativeAssignor使用的是Cooperative Rebalance,所以只会回收有变化的分区。一般而言,建议新的系统(Kafka 2.4及之后版本)使用CooperativeAssignor。当然,我们也可以实现自己的PartitionAssignor。 Kafka的Consumer Group Rebalance http://niyanchun.com/kafka-consumer-group-rebalance.html 2022-01-27T20:59:26+08:00 什么是Consumer Group Rebalance?Kafka Consumer创建的时候都要指定一个组ID(group id),所有组ID一样的Consumer就组成了一个Consumer Group。对于一个Partition同一时刻只会分配给同一个Group内某一个Consumer,这就是大家熟知的Kafka消费模型。通过这个模型,Kafka的消费者(也就是应用/服务)可以很方便的实现Load Balance、HA和水平扩展。简单说这个模型就相当于现在有M个Partition,N个Consumer,然后把这M个Partition平均分配给N个Consumer,而且分配的时候有个限制条件:一个Partition只能分配给一个Consumer,Consumer Group Rebalance就是在需要的时候去做这个分配工作的,而且它的大原则是尽量保证Partition均衡的分配给组内各个Consumer。那什么时候需要,或者说什么时候会发生Consumer Group Rebalance呢?看前面描述的职责就已经很明确了,当M或者N值发生变化的时候就需要Rebalance了,准确点就是:当Group内的Consumer个数或者Consumer订阅的Partition个数发生变化的时候就需要Rebalance了。下面列举一些常见的场景:Group内有新的Consumer加入,比如应用扩容了、修改了已有应用的并行度(N值发生变化)Group内Consumer个数减少了,比如应用缩容、挂掉、poll超时等(N值发生变化)现有Consumer修改了订阅的Topic,导致Group内的Partition个数变了(M值发生变化)已订阅的Topic修改了Partition的个数(M值发生变化)...上面这些场景有些是主动的,有些是被动的,有些是无法避免的,有些是可以避免的,有些是正常的,有些是代码bug导致的...总之,当发现资源(Partition)有变更,需要重新分配以消除“贫富差距”的时候,就会发生Consumer Group Rebalance了。但是资源的分配不论是在现实世界,还是在分布式的世界中,都是一个难题。下面介绍Kafka是怎么做的。Rebalance介绍实质上,Rebalance是一个抽象、通用的资源分配协议,它不光可以用于Partition这种资源的分配,在Kafka中,有多个地方都有使用:Confluent Schema Registry:使用Rebalance协议来选主Kafka Connect:使用Rebalance协议来给connector分配任务Kafka Stream:使用Rebalance协议来给实例分配Partition和任务网上关于这新老协议的细节讲述已经非常多了,这里就概括性的介绍一下。Rebalance Protocol如下图,Rebalance协议由2部分组成,一部分在Broker端,一部分在Client端:Group Membership Protocol(Broker端):主要负责整体协调Client Embedded Protocol(Client端) :主要负责具体的资源分配这里注意一个细节就是一部分协议是在客户端的,而且用户可以按照约定好的协议进行自定义的实现,比如实现一个自己的资源分配方案,后面就会讲到。下面还是以本文讨论的Consumer Group Rebalance的应用场景(即Partition资源的分配)来描述。对于每一个Consumer Group,都会有一个Coordinator节点(由某个Broker充当)负责这个Group的资源分配,也就是上面的Group Membership协议其实就是由这个Coordinator节点来实际运作的。假设现在新加入了一个Consumer,看下整个Rebalance过程的步骤:该Consumer给Kafka集群发送FindCoordinator请求,找到它所属的Group对应的Coordinator;找到后向Coordinator发送JoinGroup请求。该请求会携带客户端(即该Consumer)的一些用户配置(比如session.timeout.ms、max.poll.interval.ms)和一些元数据(比如订阅了哪些主题等)。收到JoinGroup请求后,Coordinator通过心跳响应(Heartbeat)响应通知组内其它成员要开始Rebalance了。然后其它Consumer像这个新加入的Consumer一样,也发送JoinGroup请求给Coordinator。当Coordinator收到组内所有成员JoinGroup请求以后,会给所有成员发送一个JoinGroup响应。其中给Group Leader(加入组的第一个成员)发送的Response里面包含了成员信息、资源分配策略等元数据,其它成员则是一个空的Response。这个Leader拿到这些信息以后,本地计算出分配结果。所有成员向Coordinator发送SyncGroup请求,Leader的请求中会包含自己计算的分配结果,其它成员则是空请求。Coordinator将Leader的分配结果通过SyncGroup响应发送给各个成员。如果Consumer实现了ConsumerRebalanceListener接口,则会调用该接口的onPartitionsAssignedMethod方法。至此,整个Rebalance过程就结束了,这里再补充一些细节:上面提到了一个心跳的概念:Consumer内部有一个心跳线程定时发送心跳给Coordinator,以让Coordinator知道自己还活着。当需要Rebalance的时候,Coordinator会在心跳响应中通知所有Consumer要进行重平衡了,这就是上面提到的通过心跳通知。上面举的例子是由一个新加入的Consumer触发了Rebalance。很多其它行为也会触发,前面已经列举过常见的场景了。结合现在的流程和心跳知识再细化一下触发场景,比如当有Consumer正常停止的时候,在结束之前会发送LeaveGroup请求给Coordinator;如果是异常停止,Coordinator会通过心跳超时来判断Consumer已经没了。当然实际中,可能Consumer其实正常着,只是因为网络原因心跳超时了,或者Consumer里面没有及时调用poll方法等。前面提到Rebalance协议分为Broker端的“Group Membership Protocol”部分和Client端的“Client Embedded Protocol”部分,上面Group Leader计算分配方案,就属于“Client Embedded Protocol”部分的功能。提这个是因为Client这部分的协议除了默认的一些实现外,用户可以去自定义实现,后面马上要讲到的改进方案Incremental Cooperative Rebalance其实就是在这里实现的。再放一个图(图片来自于引用文章From Eager to Smarter in Apache Kafka Consumer Rebalances,下同):问题分析优化之前肯定要先分析清楚现有的问题,才能有针对性的进行优化。其实从前面的介绍我们已经很清楚,Rebalance要做的事情很简单:将M个资源(Partition/Task/Connector)平均分配给N个成员(Consumer/Instance/Worker),每个资源只能被一个成员拥有。事情本身不难,但难就难在需要在分布式环境中做这个分配工作。分布式环境中在任意时刻,网络可能分区、节点可能故障、还存在竞态条件(race condition),简单说就是分布式环境中无法实现可靠的通信,这让整个问题复杂化了。前面介绍了现在的Rebalance开始的时候回收(revoke)所有成员的资源,然后大家一起参与Rebalance过程,等拿到新的资源分配方案,又重新开始工作。具体应用到Partition的分配,就是所有Consumer在发送JoinGroup请求前需要停止从Partition消费,“上交”自己拥有的Partition。这样当Coordinator收到所有Consumer的JoinGroup请求的时候,所有的Partition就处于未分配状态,此时整个系统达到了一个同步状态(Synchronization barrier):所以,在重新分配之前,先回收所有资源其实是为了在不可靠的分布式环境中简化分配工作。然而,按现在这种方式,在分区被回收到收到新的分配方案之前,所有成员都无法工作,即“Stop The World”(借鉴了GC里面的概念),这也是Rebalance存在的最大的问题。默认Rebalance流程的超时时间为5分钟,也就是最差情况下,“Stop The World”效果可能持续5分钟。所以需要针对这个问题进行优化,思路也有两种:尽量减少Rebalance的发生减少Rebalance中“Stop The World”的影响社区在2.3版本中同时引入了两个优化方案:KIP-345: Static Membership和KIP-429: Kafka Consumer Incremental Rebalance Protocol分别按照上述两种思路进行优化,下面分别介绍。改进点1:Static MembershipStatic Membership主要的优化目标是减少“闪断”场景导致的Rebalance,即解决的思路主要是尽量减少Rebalance的发生,我们看下是如何优化的。在每次Rebalance的时候,Coordinator会随机给每个成员分配一个唯一的ID。然后当有新成员加入的时候,它的ID会是一个空字符串UNKNOWN_MEMBER_ID,这样Coordinator就知道它是新加入的,需要进行Rebalance了。Static Membership方案是给Consumer增加了group.instance.id选项,由用户负责设置以及保证唯一性,这个ID会替换原来由Coordinator每次Rebalance随机生成的ID(随机生成称之为“Dynamic Membership”),并且这个ID信息会加到JoinGroup请求中。那这个ID有什么用呢?举个例子:某一刻Consumer应用因为内存使用过高,被系统OOM Killer干掉了,然后很快又被守护进程或者人为启动起来的。这个时候,如果是以前的情况,Coordinator会认为是有新的Consumer加入,需要进行一轮Rebalance,但如果是Static Membership的情况下,Coordinator通过ID发现这个Consumer之前就有,就不会重新触发整个Rebalance,而是将缓存的之前分配给该Consumer的Partition直接返回给他,这样就一定程度上避免了因为闪断导致的Rebalance。 当然,这里我用了“闪断”,主要是想表达意外挂掉又很快恢复的情况,更具体点:意外挂掉:指被kill、网络闪断等不会主动(或者说没有机会)给Coordinator发送LeaveGroup请求的场景。因为如果主动给Coordinator发送了LeaveGroup请求的话,Coordinator会马上开始一轮Rebalance。很快恢复:指在Coordinator检测到Consumer挂掉之前,就恢复了。具体点说就是在session.timeout.ms或者max.poll.interval.ms时间内就恢复了,否则Coordinator会认为Consumer挂了,开始Rebalance。这里简单提一下这两个配置项。在0.10.0及之前的版本中,心跳是和poll在一个线程里面的,只有session.timeout.ms一个参数。后来进行了优化拆分(KIP-62: Allow consumer to send heartbeats from a background thread),心跳是一个单独的线程,poll是一个线程,session.timeout.ms仍然是心跳的超时时间,而max.poll.interval.ms则是poll线程的超时时间。不管哪一个超时,Coordinator都会认为Consumer挂了,需要Rebalance。如果我们要使用Static Membership特性,需要给Consumer增加group.instance.id设置。同时尽量将上面提到的超时时间设置的长一些。但显然弊端就是Consumer如果真的挂掉且无法恢复的话,Coordinator需要等较长一段时间才能发现,相当于牺牲了一定的可用性。果然没有免费的蛋糕。改进点2:Incremental Cooperative Rebalancing不同于Static Membership,Incremental Cooperative Rebalancing的思路是尽量减少Rebalance中“Stop The World”的时间和范围。那怎么做的呢?有这么几个关键点:所有成员还是会发送JoinGroup请求,但这次发送的时候资源并不会被回收(即不会停止工作),大家只是将自己目前拥有的资源信息加到元数据里面,发送给Coordinator。然后Coordinator把这些信息发送给Group Leader,Leader根据这些信息计算新的分配方案,计算的时候在保证均衡的情况下尽量对现有状态做最小改动(实际由实现的分配算法决定,默认的StickyAssianor策略就是这种),换句话说最核心的就是看哪些资源变更了成员,那就需要从原拥有者那里剔除这个资源,然后加到新的拥有者那里。然后Coordinator会将新的分配方案按照原有的方式通过SyncGroup响应发送给各个成员。各个成员收到新的分配方案以后,会和自己的现状做对比,如果没有变化或者只是新增了资源,则不需要额外做什么。但如果发现有资源被回收,则继续Rebalance的流程,接下来的流程和老版本的协议几乎一样,也需要回收资源,并发送JoinGroup请求,但这里仅回收需要被回收的资源。比如某个ConsumerRebalance之前拥有1、3、5三个分区,Rebalance中重新计算的新方案里面是1、3两个分区,则只回收5。可以看到Incremental Cooperative Rebalancing是将原有的Rebalance流程进行了细化(分成了多轮),延迟了资源回收的时间和范围,改进后的Rebalance流程如下图:那如何使用Incremental Cooperative Rebalancing呢?通过配置项partition.assignment.strategy进行配置,可以配置多个,越靠前优先级越高。前面提到了Rebalance协议分两部分,这里配置的其实就是客户端“Client Embedded Protocol”的实现类。2.8版本中已经支持的有:org.apache.kafka.clients.consumer.RangeAssignor(默认值)org.apache.kafka.clients.consumer.RoundRobinAssignororg.apache.kafka.clients.consumer.StickyAssignororg.apache.kafka.clients.consumer.CooperativeStickyAssignor我们也可以通过实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口来实现自定义的Assignor。如果想使用Incremental Cooperative Rebalancing,就配置最后一个CooperativeStickyAssignor即可。不同Assignor的细节本文就不展开了,另外规划了一篇文章《Kafka的消费者分区分配策略》。更多关于Incremental Cooperative Rebalancing的细节,可以参考本文引用部分的文章:Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?From Eager to Smarter in Apache Kafka Consumer Rebalances总结Kafka中的Rebalance本质上是解决分布式环境中资源分配的一种通用协议,由于分布式环境的复杂性,无法实现一个完美的方案,只能根据具体的场景进行有针对性的优化。比如实际中“闪断”是引起Rebalance的一种很常见且无法避免的原因,所以就有针对性的增加了Static Membership方案。另外Rebalance很严重的一个问题就是会“Stop The World”,然而实际中Rebalance的时候其实往往只需要变更极少量的资源所属权,所以就提出了Incremental Cooperative Rebalance方案,减少了Rebalance过程中“Stop The World”的时间和影响范围。好的架构不是设计出来的,而是进化而来的,Kafka Rebalance优化的脚步仍在继续。另外,尽管现在已经做了诸多优化,效果也比较明显,但Rebalance仍然算是一个代价比较大的操作,实际应用的时候,我们还是要能避免的就避免。References:Apache Kafka Rebalance Protocol, or the magic behind your streams applicationsIncremental Cooperative Rebalancing: Support and PoliciesKIP-415Incremental Cooperative Rebalancing in Apache Kafka: Why Stop the World When You Can Change It?From Eager to Smarter in Apache Kafka Consumer Rebalances Kafka的监听地址配置 http://niyanchun.com/kafka-listener-config.html 2022-01-22T17:32:00+08:00 本文基于Kafka 2.8.有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系。本文介绍监听相关的配置,目前监听相关的参数主要有下面几个:listenersadvertised.listenerslistener.security.protocol.mapinter.broker.listener.namesecurity.inter.broker.protocoladvertised.host.name(历史遗留,已废弃,勿使用)advertised.port(历史遗留,已废弃,勿使用)host.name(历史遗留,已废弃,勿使用)其中最重要的就是listeners和advertised.listeners:集群启动时监听listeners配置的地址,并将advertised.listeners配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:通过listeners配置的连接信息(ip/host)连接到某个Broker(broker会定期获取并缓存zk中的元数据信息),获取元数据中advertised.listeners配置的地址信息。通过第1步获取的advertised.listeners连接信息和Kafka集群通信(读/写)。所以在存在内外网隔离的虚拟化环境中(比如Docker、公有云),外部客户端经常会出现可以连接到Kafka(第1步),但发送/消费数据时报连接超时(第2步),就是因为listeners配置的是外网地址,而advertised.listeners配置的却是内网地址。那这几个参数该如何配置呢?先看连接信息的配置格式:{listener名字}://{HOST/IP}:{PORT}。HOST/IP、PORT很清楚,主要是这个“listener名字”字段。要理解这个得了解listener.security.protocol.map这个配置项:它的用途是配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”,其默认值是“listener名字”和“协议名称”一样。有点绕,举个例子,比如:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,冒号前面是key,即协议名字;后面是value,即协议名称。listener名字我们可以随便起,而协议名称则是固定可枚举的一个范围。所以如果我们自定义了listener名字,那就需要显式的设置其对应的协议名。inter.broker.listener.name和security.inter.broker.protocol都是用于配置Broker之间通信的,前者配置名称(即listener.security.protocol.map中的key),后者配置协议(即listener.security.protocol.map中的value),默认值是PLAINTEXT。这两个配置项同时只能配置一个。为什么一个连接要搞这么复杂呢?主要是为了各种不同的场景需求。下面举一个复杂一点的应用场景进行说明。比如我们在一个公有云上面部署了一个Kafka集群,该环境有一个外网地址external_hostname和一个内网地址internal_hostname;且在内部中是无法获取外网地址的(公有云大多都是这样的)。然后想实现内部客户端访问集群时走内部地址,且不需要加密;而外部客户端访问时则走外部地址,且需要加密。要实现这个需求,可以对集群进行如下配置:listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092 inter.broker.listener.name=INTERNAL其实更进一步,我们还可以通过可选的control.plane.listener.name参数单独定制集群Controller节点与其他Broker节点的连接,那配置信息就变为:listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROL:SSL listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092,CONTROL://{control_ip}:9094 inter.broker.listener.name=INTERNAL control.plane.listener.name=CONTROL最后给出这些配置项的默认值和一些注意事项:listeners如果不显式的配置,那会监听所有网卡,相当于配置了0.0.0.0。该配置项里面listeners名字和端口都必须是唯一的,不能重复。advertised.listeners如果不配置,默认使用listeners配置的值。如果listeners也没有显式配置,则使用java.net.InetAddress.getCanonicalHostName()获取的IP地址。如果listeners配置的是0.0.0.0,则必须显式的配置advertised.listeners,因为这个配置项必须是一个具体的地址,不允许是0.0.0.0(因为客户端无法根据这个地址连接到Broker)。另外,advertised.listeners中的端口允许重复。对于listeners和advertised.listeners,有多个地址的时候,每一个地址都必须按照{listener名字}://{HOST/IP}:{PORT}格式进行配置,多个地址用英文逗号分隔。如果集群所有节点的hostname在客户端和服务端各节点之间可以正确解析,优先使用hostname,而不是IP。因为代码里面使用了java.net.InetAddress.getCanonicalHostName(),有时使用IP会出现访问不通的情况。总结:listeners地址是用于首次连接的;advertised.listeners的地址是会写到zk里面,客户端通过listeners地址建立连接获取该地址信息,然后通过该地址和集群交互。所以对于客户端,这2个地址必须都是可以访问的才可以。 Kafka的Producer http://niyanchun.com/kafka-producer.html 2022-01-16T17:32:00+08:00 上篇文章介绍了Kafka Consumer,这篇文章讨论Kafka Producer。Kafka Producer流程概述下面是一个简单的Producer实现:public class SimpleProducer { public static void main(String[] args) { Properties config = new Properties(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-write"); config.put(ProducerConfig.ACKS_CONFIG, "all"); config.put(ProducerConfig.RETRIES_CONFIG, 0); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); Producer<String, String> producer = new KafkaProducer<>(config); for (int i = 0; i < 1000; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-write", Integer.toString(i)); producer.send(record, (m, e) -> { if (e != null) { e.printStackTrace(); } else { System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n", m.topic(), m.partition(), m.offset()); } }); } producer.flush(); producer.close(); } }大概流程就是:创建KafkaProducer(KafkaConsumer不是线程安全的,但KafkaProducer是线程安全的);创建ProducerRecord;调用send方法发送record。该方法返回的是一个Future对象,而且也允许指定回调函数。所以要是想同步发送,则调用Future对象的get即可;但一般建议定义回调函数,异步发送;最后close掉对象,释放资源;以上便是从用户代码角度看到的Producer的流程,下面从Kafka内部来看。如下图(来自参考部分第一篇文章):KafkaProducer内部主要由一个Memory Buffer和一个后台IO线程组成,数据写入流程如下:生成ProducerRecord对象,该对象必传的2个参数是topic和message value。调用send发送生成的record。调用指定的序列化函数序列化key和value,生成字节数组以方便在网络上传输。计算分区号。Kafka的分区策略是这样的:如果record中指定了partition字段,则直接使用。如果没有指定,则检查record中是否指定了key,如果有则基于该key计算哈希。如果key也没有指定,则使用一个叫"sticky partition"的策略计算分区号。sticky partition是2.4版本加入的,之前的版本是“round-robin”。改成sticky是因为轮询策略会把一批数据分散到多个batch去,这样batch比较小,批量的效果就不是很好。而sticky实质是个优化版本的轮询,它会等一个batch满了以后,再将record分配给下个batch,具体见KIP-480。将数据加到按partition分组的mem buffer里面的batch里边。加到batch以后,send方法就返回了。后台IO线程从mem buffer里面取一个batch的数据封装成一个request发送给kafka broker。后端broker成功接收以后,就会返回RecordMetadata对象,该对象包含主题、分区号、分区内的offset。如果接收失败,就会返回错误,此时如果配置了重试,就会自动重试若干次。重试之后还失败,就会抛出异常。一些重要配置学习KafkaProducer的重点之一就在于掌握一些重要参数的使用,下面仅是进行简单介绍,完整的课参考官方文档。可靠性相关主要就是acks了,有3个值:ack=0:数据发送后,不需要服务端确认;ack=1:默认值,数据发送后,Leader Partition写成功就返回;ack=all(或者-1),ISR中所有节点的Partition写成功才返回;关于可靠性我之前已经有单独的文章《Kafka的可靠性》介绍过了,这里就不赘述了。重试和超时这部分配置主要影响的是是否会产生重复数据。retries:send失败后的自动重试次数,默认为0. 当大于0的时候,重试可能导致request乱序。max.in.flight.requests.per.connection:一个连接允许同时发送request的个数,超过这个个数会block。一般配合retris使用:当retries设置大于0时,该参数设置为1,仍然可以保证顺序。kafka 1.1版本之前的默认值是1,1.1及之后版本默认值是5.request.timeout.ms:producer等待request响应的最大时间,如果超过这个时间,就会认为发送失败,会开始retry(如果配置了的话)。delivery.timeout.ms:请求发送的上限时间,从send方法返回开始算起。delivery timeout = time delay prior to sending + time to await ack + time for retries.默认值为120000,即2分钟。retries.backoff.ms:重试间隔Batch批量的定时达量配置:batch.size:batch的大小,单位是字节。设置为0表示关闭batch功能。linger.ms: the producer will wait for linger.ms time before sending the record. This allows multiple records to be batched together.batch request发送到broker的条件是:达到batch.size的大小,或者等待时间超过linger.ms。即所谓的定时或者达量。Memory Buffer对Producer内存使用的控制:buffer.memory:用于缓存record的内存总大小,单位是字节。超过以后send会阻塞。max.block.ms:send方法允许阻塞的最长时间,超时就会抛出异常。Compressioncompression.type:压缩类型,默认为none,表示不压缩。可选的值有:none, gzip, snappy, lz4 or zstd.压缩这里有几个注意点:压缩是针对整个batch压缩的,所以batch越大,压缩效率越高。一般来说一定要保证producer和broker端的压缩方法一致,不然会出现producer压缩发送到broker之后,broker又解压然后按自己的压缩算法重新压缩(CPU资源会使用比较多)。一般比较好的配置策略是producer压缩,broker端保持(即不配置压缩)。Kafka 0.11版本引入了两个特殊的Producer:idempotent producer和transactional producer,下面分别介绍。Idempotent Producer解决什么问题幂等producer主要是为了解决下面这种重试导致数据重复的问题:使用幂等producer的方法是设置:enable.idempotence = true(默认是false)。这个配置等价于下面配置项的组合:acks = allretries = Integer.MAX_VALUEmax.in.flight.requests.per.connection = 1 (0.11 >= Kafka < 1.1) OR 5 (Kafka >= 1.1)也就是幂等producer是为了提供“exactly once”语义。如何解决对于每个session,kafka leader broker会给每个producer分配一个唯一的producer id;每次send之前,producer会给每个record分配一个单调递增的sequence number;kafka broker会记录每个partition上面每个producer id的最大sequence number,对于某个producer,如果收到小的sequence number,就会丢掉这个record。但是注意:应用层自己的re-send操作导致的重复幂等producer不能去重,因为这个时候sequence number已经变了只能保证当前session内的exactly once。如何使用以下摘自Kafka 2.8 Kafka Producer Javadoc:To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.简单概括就是:应用层(失败时)不要自己重发,内部有自动尝试;如果有错误导致无限重试时,就shutdown程序来定位问题;设置enable.idempotence = true(此时retries会自动设置为Integer.MAX_VALUE,用户不要修改)注意幂等只能在一个session范围内提供幂等性Transactional Producer如其名,transactional producer能提供事务保证,使用方式很简单,给当前Producer设置以下两个参数:设置enable.idempotence = true设置一个transactional.id,这个id在消费同一个partition的producer里面要是唯一的( it should be unique to each producer instance running within a partitioned application)。编码模式如下:Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();这种模式大家很熟悉,和数据库的事务使用基本一样,语义也一致,这里就不赘述了。另外,transactional producer是不依赖于session的,也就是即使session断了,甚至是进程重启都不影响。前面需要设置的那个transactional.id就是为了在出现问题时,能够跨session恢复。还有一个注意点就是transactional producer的机制其实还是“预写日志(WAL)”,所以即使失败,数据也可能写入日志了。在DB中,这部分数据client肯定是读不到了,但在kafka里面能不能读到,是由客户端的参数isolation.level决定的,这个配置有2个可选值:read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。Transactional Producer提供的也是exactly once的语义保证。KafkaProducer Class Javadoc其实Kafka Producer的知识有2个非常好的获取方式都已经在Kafka Producer代码里面了,KafkaProducer类的Javadoc非常详细和全面的讲解了Kafka Producer,其onSend方法则可以看到发送的全流程,想深入了解的,可以从这2个途径获得更详细的信息。下面摘抄了kafka 2.8 KafkaProducer类完整的Javadoc,本文很多内容可以从里面找到出处和说明,完整内容如下(在线地址:Kafka 2.8 KafkaProducer Javadoc):A Kafka client that publishes records to the Kafka cluster.The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ​ Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); ​ producer.close();The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.​The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.​The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.​The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included org.apache.kafka.common.serialization.ByteArraySerializer or org.apache.kafka.common.serialization.StringSerializer for simple string or byte types.From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.​To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.​To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.​To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.​The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application.​All the new transactional APIs are blocking and will throw exceptions on failure. The example below illustrates how the new APIs are meant to be used. It is similar to the example above, except that all 100 messages are part of a single transaction.​Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); ​ producer.initTransactions(); ​ try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.​The transactional producer uses exceptions to communicate error states. In particular, it is not required to specify callbacks for producer.send() or to call .get() on the returned Future: a KafkaException would be thrown if any of the producer.send() or transactional calls hit an irrecoverable error during a transaction. See the send(ProducerRecord) documentation for more details about detecting errors from a transactional send.​By calling producer.abortTransaction() upon receiving a KafkaException we can ensure that any successful writes are marked as aborted, hence keeping the transactional guarantees.​This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain client features. For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an UnsupportedVersionException when invoking an API that is not available in the running broker version.总结KafkaConsumer是线程不安全的,所以之前介绍KafkaConsumer的文章重点在offset的控制,特别是多线程消费的时候。但KafkaProducer是线程安全的,所以他的重点和难点不在于多线程,而在于如何保证数据发送时的可靠性、效率、语义保证等。KafkaProducer提供了大量的配置,可以让用户根据自己的场景进行配置。还提供了幂等Producer和事务Producer,这二者目的都是为了提供exactly once语义保证,而且事务Producer还可以提供跨session的事务(当然性能也会比较差)。但是我们需要清楚,这些保证都是有条件的,比如应用层用户自己的重发是无法保证的,当然还有其它情况。所以和上篇文章中提到的一样,如果想实现端到端的exactly once,光靠kafka是不行的,需要全局考虑。参考:Unleash Kafka Producer’s Architecture and Internal Workingskafka 2.8.1 documetation and code极客时间《Kafka核心技术与实践》专栏 Kafka的多线程消费者实现 http://niyanchun.com/kafka-multi-thread-consumer.html 2022-01-09T14:18:00+08:00 Kafka的消费者类KafkaConsumer是非线程安全的,那如何实现多线程的Consumer呢?先了解一下一般Consumer的流程。如上图:通过poll方法从kafka集群拉取数据;处理数据提交offset(如果开启了自动提交enable.auto.commit=true,则每次poll的时候会自动提交上一次poll的offset)如此往复。翻译成代码类似下面这样: Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); ... // 创建consumer实例(非线程安全) KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("test")); while (true) { // 拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000)); // 处理数据 for (ConsumerRecord<String, String> record : records) { ... } // 提交offset consumer.commitSync(); }目前kafka consumer的多线程方案常见的有两种:Thread per consumer model:即每个线程都有自己的consumer实例,然后在一个线程里面完成数据的获取(poll)、处理(process)、offset提交。Multi-threaded consumer model:一个线程(也可能是多个)专门用于获取数据,另外一组线程专门用于处理。这种模型没有统一的标准。下面分别介绍。Thread-Per-Consumer Model这种多线程模型是利用Kafka的topic分多个partition的机制来实现并行:每个线程都有自己的consumer实例,负责消费若干个partition。各个线程之间是完全独立的,不涉及任何线程同步和通信,所以实现起来非常简单,使用也最多,像Flink里面用的就是这种模型。比如下面是2个线程消费5个分区的示例图:​用代码实现起来的思路是:先确定线程数,然后将分区数平均分给这些线程。下面是一个示例代码(完整代码见这里 github):/** * @author NiYanchun **/ public class ThreadPerConsumerModel { public static void main(String[] args) throws InterruptedException { Properties config = new Properties(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final String topic = "test"; int partitionNum = getPartitionNum(topic); final int threadNum = 4; int partitionNumPerThread = (partitionNum <= threadNum) ? 1 : partitionNum / threadNum + 1; ExecutorService threadPool = Executors.newFixedThreadPool(partitionNum); List<TopicPartition> topicPartitions = new ArrayList<>(partitionNumPerThread); for (int i = 0; i < partitionNum; i++) { topicPartitions.add(new TopicPartition(topic, i)); if ((i + 1) % partitionNumPerThread == 0) { threadPool.submit(new Task(new ArrayList<>(topicPartitions), config)); topicPartitions.clear(); } } if (!topicPartitions.isEmpty()) { threadPool.submit(new Task(new ArrayList<>(topicPartitions), config)); } threadPool.shutdown(); threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } static class Task implements Runnable { private final List<TopicPartition> topicPartitions; private final Properties config; public Task(List<TopicPartition> topicPartitions, Properties config) { this.topicPartitions = topicPartitions; this.config = config; } @Override public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config); System.out.println(topicPartitions); consumer.assign(topicPartitions); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } } public static int getPartitionNum(String topic) { return 8; } }上面的代码仅是作为原理说明,所以有些处理都简化或者硬编码了,核心逻辑就是将分区数均分给所有线程,这里不再赘述。​这种模型最大的优点是简单,但并非完美。在这种模型里面,获取数据、处理数据都是在一个线程里面的,如果在处理流程需要耗费很长时间,甚至是不可控的时候就会产生问题。会有什么问题?Kafka有一个参数**max.poll.interval.ms**,它表示的是两次poll调用的最大时间间隔。如果超过这个时间间隔,kafka就会认为这个consumer已经挂了,就会触发consumer group rebalance。这个参数默认值是5分钟,也就是如果某次处理超过5分钟,那就可能导致rebalance。Rebalance有什么问题呢?问题很大,对于Rebalance以后再介绍,这里先简单概括一下:引发Consumer Group Rebalance操作主要有3种情况:(1)consumer数量发生变化(2)consumer订阅的主题发生变化(2)主题的分区数发生变化Rebalance是一个比较重的操作,它需要consumer group的所有活着的consumer参与,当consumer比较多的时候,rebalance会很耗时(比如若干小时);而且在没有完成之前,大家都是干不了活的,体现在业务上就是停止处理了。Rebalance其实就是重新划分partition,如果是自己提交offset的话,处理不好,就可能产生重复数据,后面再说。总之,Rebalance操作能避免应该尽可能避免,特别是因为编码不合理产生的应该坚决改掉。为了避免处理时间超过这个最大时间间隔,在仍然使用这种模型的前提下,一般可以通过调整下面的参数一定程度的解决问题:控制每次poll的数据不要太大,即修改max.poll.records参数的值,默认是500,即在给定时间内一次最多poll 500个record。将max.poll.interval.ms的值改大一些。但有些场景通过改上面的2个参数也无法解决,而且可能还挺常见,举个例子,比如消费业务数据,处理后写入到外部存储。如果外部存储挂了,在没有恢复之前一般是不应该继续消费Kafka数据的。此时通过调整max.poll.interval.ms的方法就失效了,因为事先是完全不知道应该设置多少的。​另外一个多线程模型可以解决上面这些问题,但会复杂很多。Multi-Thread Consumer Model先看下模型的图(注意:多线程模型的设计方式没有统一标准,下面这种只是其中一种而已):具体处理流程为:poll线程专门负责拉取数据,然后将数据按partition分组,交给处理线程池,每个线程一次只处理一个分区的数据。数据交给处理线程后,poll继续拉取数据。现在有2个问题:如何像Thread-Per-Consumer那样,保证一个分区里面的数据有序如何提交offset数据有序性要保证partition内数据有序,只要避免多个线程并行处理同一个partition的数据即可。在poll线程给线程池分发数据的时候,已经按partition做了分组,也就是保证了一次拉取的数据中同一个partition的数据只会分配给一个线程。现在只要保证分区数据处理完成之前不再拉取该分区的数据,就可以保证数据的有序了。KafkaConsumer类提供了一个pause和resume方法,参数都是分区信息的集合:public void pause(Collection<TopicPartition> partitions) public void resume(Collection<TopicPartition> partitions)调用pause方法后,后续poll的时候,这些被pause的分区不会再返回任何数据,直到被resume。所以,可以在本次partition的数据处理完成之前,pause该partition。Offset提交很显然,poll线程和处理线程解耦异步以后,就不能使用自动提交offset了,必须手动提交,否则可能offset提交了,但数据其实还没有处理。提交是poll线程做的,而offset的值则是处理线程才知道的,两者之前需要一个信息传递机制。代码实现的时候需要考虑。​这里插一个关于offset提交的话题。我们知道有2种方式提交offset:自动提交:将enable.auto.commit设置为true即可;每次poll的时候会自动提交上一次的offset。手动提交:分为同步提交commitSync和异步提交commitAsync。同步就是阻塞式提交,异步就是非阻塞式。而且二者都支持指定具体partition的offset,也就是可以精细化的自定义offset。这里不展开介绍具体细节,只讨论一个问题:offset提交和消息传递语义(Message Delivery Semantics)的关系。Kafka提供了3种消息传递保证:at most once:最多一次,即不会产生重复数据,但可能会丢数据at least once:至少一次,即可能会产生重复数据,但不会丢数据exactly once:准确的一次,不多也不少首先自动提交offset提供的是at least once的保证,因为他是在poll的时候提交上次数据的offset,也就是如果处理本次poll拉取的数据的时候异常了,导致没有执行到下次poll,那这次这些数据的offset就无法提交了,但数据可能已经处理了一部分了。要实现最多一次也很简单,每次poll完数据,先提交offset,提交成功之后再开始处理即可。而要实现exactly once很难,而且这里的exactly once其实仅指消费,但我们一般想要的是全系统数据链上的exactly once。这个问题比较复杂,要实现真正的端到端exactly once,可以去查查flink、dataflow的设计,需要有很多假设和权衡,这里就不展开了。​重点说明一种情况,考虑这样一个流程:poll数据,处理数据,手动同步提交offset,然后再poll。当然也可能会有变种,就是边处理数据,边提交offset,但实质一样。这种情况会不会产生数据重复?答案是:会。其实不管你用同步提交,还是异步提交,都可能产生数据重复。因为系统随时可能因为其它原因产生Rebalance。所以要真正避免重复数据,正确的方式应该是实现ConsumerRebalanceListener接口,监听Rebalance的发生。当监听到发生Rebalance导致当前consumer的partition发生变化时,同步提交offset。但即使这样,其实也不能严格避免,因为系统还可能在任意阶段挂掉。因此这种流程实际提供的也是at least once的保证。所以这里想要表达的意思是,如果你的系统不能容忍有重复数据(在不丢数据的前提下),光靠kafka是做不到的,这里通过一些编码优化只能减少发生的概率,无法杜绝。终极方案应该是在应用层解决。比如幂等设计、使用数据的时候去重等。代码实现代码实现分为poll线程的实现(PollThread.java)、处理线程的实现(ProcessThread.java)、测试类Main.java。完整的代码见github。下面对关键地方进行说明。先看主流程:// PollThread.java @Override public void run() { try { // 主流程 while (!stopped.get()) { // 消费数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000)); // 按partition分组,分发数据给处理线程池 handleFetchedRecords(records); // 检查正在处理的线程 checkActiveTasks(); // 提交offset commitOffsets(); } } catch (WakeupException we) { if (!stopped.get()) { throw we; } } finally { consumer.close(); } }主流程比较清晰:poll数据按partition分组,分发数据给处理线程池检查正在处理的线程提交offsetpoll数据没什么可说的,看下handleFetchedRecords:// PollThread.java private void handleFetchedRecords(ConsumerRecords<String, String> records) { if (records.count() > 0) { List<TopicPartition> partitionsToPause = new ArrayList<>(); // 按partition分组 records.partitions().forEach(partition -> { List<ConsumerRecord<String, String>> consumerRecords = records.records(partition); // 提交一个分区的数据给处理线程池 ProcessThread processThread = new ProcessThread(consumerRecords); processThreadPool.submit(processThread); // 记录分区与处理线程的关系,方便后面查询处理状态 activeTasks.put(partition, processThread); }); // pause已经在处理的分区,避免同个分区的数据被多个线程同时消费,从而保证分区内数据有序处理 consumer.pause(partitionsToPause); } }该方法里面主要做了下面几件事情:按partition将数据分组,然后每个分组交给线程池去处理记录分区和处理线程的关系,因为后面要查询处理的状态将正在处理的分区pause,即这些分区在后续poll中不会返回数据,直到被resume。原因前面说了,如果继续消费,可能会有多个线程同时处理一个分区来的数据,导致分区内数据的处理顺序无法保证。接下来,先看下处理线程的实现,代码量不大,就全贴上了:@Slf4j public class ProcessThread implements Runnable { private final List<ConsumerRecord<String, String>> records; private final AtomicLong currentOffset = new AtomicLong(); private volatile boolean stopped = false; private volatile boolean started = false; private volatile boolean finished = false; private final CompletableFuture<Long> completion = new CompletableFuture<>(); private final ReentrantLock startStopLock = new ReentrantLock(); public ProcessThread(List<ConsumerRecord<String, String>> records) { this.records = records; } @Override public void run() { startStopLock.lock(); try { if (stopped) { return; } started = true; } finally { startStopLock.unlock(); } for (ConsumerRecord<String, String> record : records) { if (stopped) { break; } // process record here and make sure you catch all exceptions; currentOffset.set(record.offset() + 1); } finished = true; completion.complete(currentOffset.get()); } public long getCurrentOffset() { return currentOffset.get(); } public void stop() { startStopLock.lock(); try { this.stopped = true; if (!started) { finished = true; completion.complete(currentOffset.get()); } } finally { startStopLock.unlock(); } } public long waitForCompletion() { try { return completion.get(); } catch (InterruptedException | ExecutionException e) { return -1; } } public boolean isFinished() { return finished; } }代码逻辑比较简单,很多设计都是在实现“如何优雅停止处理线程”的情况,这里简单说明一下。不考虑进程停止这种情况的话,其实需要停止处理线程的一个主要原因就是可能发生了rebalance,这个分区给其它consumer了。这个时候当前consumer处理线程优雅的停止方式就是完成目前正在处理的那个record,然后提交offset,然后停止。注意是完成当前正在处理的record,而不是完成本次分给该线程的所有records,因为处理完所有的records可能需要的时间会比较长。前面在offset提交的时候已经提到过,手动提交offset的时候,要想处理好rebalance这种情况,需要实现ConsumerRebalanceListener接口。这里PollThread实现了该接口:@Slf4j public class PollThread implements Runnable, ConsumerRebalanceListener { private final KafkaConsumer<String, String> consumer; private final Map<TopicPartition, ProcessThread> activeTasks = new HashMap<>(); private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); // 省略其它代码 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 1. stop all tasks handling records from revoked partitions Map<TopicPartition, ProcessThread> stoppedTask = new HashMap<>(); for (TopicPartition partition : partitions) { ProcessThread processThread = activeTasks.remove(partition); if (processThread != null) { processThread.stop(); stoppedTask.put(partition, processThread); } } // 2. wait for stopped task to complete processing of current record stoppedTask.forEach((partition, processThread) -> { long offset = processThread.waitForCompletion(); if (offset > 0) { offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); } }); // 3. collect offsets for revoked partitions Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>(); for (TopicPartition partition : partitions) { OffsetAndMetadata offsetAndMetadata = offsetsToCommit.remove(partition); if (offsetAndMetadata != null) { revokedPartitionOffsets.put(partition, offsetAndMetadata); } } // 4. commit offsets for revoked partitions try { consumer.commitSync(revokedPartitionOffsets); } catch (Exception e) { log.warn("Failed to commit offsets for revoked partitions!", e); } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 如果分区之前没有pause过,那执行resume就不会有什么效果 consumer.resume(partitions); } }可以看到:当检测到有分区分配给当前consumer的时候,就会尝试resume这个分区,因为这个分区可能之前被其它consumer pause过。当检测到有分区要被回收时,执行了下面几个操作:查看activeTasks,看是否有线程正在处理这些分区的数据,有的话调用ProcessThread的stop方法将这些处理线程的stopped标志位设置成true。同时记录找到的这些线程。在ProcessThread的run方法里面每次循环处理record的时候都会检测上一步置位的这个stopped标志位,从而实现完成当前处理的record后就停止的逻辑。然后这一步就是等待这些线程处理结束,拿到处理的offset值,放到待提交offset队列offsetsToCommit里面。从待提交队列里面找到要被回收的分区的offset,放到revokedPartitionOffsets里面。提交revokedPartitionOffsets里面的offset。简单总结的话,当收到有某些partition要从当前consumer回收的消息的时候,就从处理线程里面找到正在处理这些分区的线程,然后通知它们处理完正在处理的那一个record之后就退出吧。当然退出的时候需要返回已处理数据的offset,然后PollThread线程提交这些分区的offset。​然后继续回到主流程,之前讨论到了handleFetchedRecords,主要是按partition分区,并将分区数据分发给处理线程,并将这些partition pause。然后看接下来checkActiveTasks:private void checkActiveTasks() { List<TopicPartition> finishedTasksPartitions = new ArrayList<>(); activeTasks.forEach((partition, processThread) -> { if (processThread.isFinished()) { finishedTasksPartitions.add(partition); } long offset = processThread.getCurrentOffset(); if (offset > 0) { offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); } }); finishedTasksPartitions.forEach(activeTasks::remove); consumer.resume(finishedTasksPartitions); }该方法的主要作用是:找到已经处理完成的partition,并获取对应的offset,放到待提交队列里面。同时resume这些partition。​最后就是commitOffsets了:private void commitOffsets() { try { long currentMillis = System.currentTimeMillis(); if (currentMillis - lastCommitTime > 5000) { if (!offsetsToCommit.isEmpty()) { consumer.commitSync(offsetsToCommit); offsetsToCommit.clear(); } lastCommitTime = currentMillis; } } catch (Exception e) { log.error("Failed to commit offsets!", e); } }这个方法比较简单,就是按一定的时间间隔同步提交待提交队列里面的offset。至此,该模型的整体流程就结束了。其实这个模型还隐式的实现了限流:如果后端线程都在处理了,虽然poll线程还会继续poll,但这些partition因为被pause了,所以不会真正返回数据了。这个功能在将poll和处理解耦,分到不同线程以后是很重要的。如果没有限流且后面处理比较慢的话,如果限制了poll和线程池之间传递records的队列大小,那poll的数据要么丢掉,要么就等待,等待的话又会碰到之前可能会超max.poll.interval.ms的问题。但如果不限制的话,队列就会一直变大,最后OOM。总结从提供的功能来看,两种多线程模型都实现了基本的功能:多线程处理,提高并发提供partition内数据有序保证提供 at least once语义保证但第二种模型可以更好的应对处理流程比较慢的需求场景,之所以要处理这种场景根本原因其实还是kafka的max.poll.interval.ms机制,也就是我们不能无限期的阻塞poll调用。另外,如果不需要提供partition内数据有序的话,可以对模型进行改造,改成纯基于数据拆分的多线程模式。目前的实现其实和Thread-Per-Consumer一样,都是基于partition的拆分、并发,只不过将流程从同步改成了异步而已。​总体而言,两种模型各有利弊,但如果Thread-Per-Consumer模型能满足需求的话,肯定是应该优先使用的,简单明了。毕竟在软件世界,在满足需求的前提下,系统越简单越好。如无必要,勿增实体。​参考:kafka documentation​Multi-Threaded Message Consumption with the Apache Kafka Consumer ES基于磁盘的shard分配机制浅析 http://niyanchun.com/es-disk-based-shard-allocation.html 2021-05-30T21:21:00+08:00 先回顾几个概念:ES的Index是个逻辑概念,实际由若干shard组成,而shard就是Lucene的Index,即真正存储数据的实体。当有数据需要存储的时候,就需要先分配shard。具体来说需要分配shard的场景包括:数据恢复,主分片(primary)、副本分片的分配,再平衡(rebalancing),节点的新增、删除。对于分布式存储系统来说,数据的分布非常重要,ES shard的分配工作由ES的master节点负责。ES提供了多种分配策略的支持,简单来说就是用户可以通过配置定义一些“策略”或者叫“路由规则”,然后ES会在遵守这些策略的前提下,尽可能的让数据均匀分布。比如可以配置机房、机架属性,ES会尽量让主数据和副本数据分配在不同的机房、机架,起到容灾的作用。再比如,可以配置一些策略,让数据不分配到某些节点上面,这在滚动升级或者数据迁移的时候非常有用。不过本文并不会介绍所有这些策略,只聚焦于默认的基于磁盘的分配策略,因为这部分是最常用的。先说一下再平衡。再平衡再平衡这个概念在分布式存储系统里面很常见,几乎是标配。因为各种各样的原因(比如分配策略不够智能、新增节点等),系统各个节点的数据存储可能分布不均,这时候就需要有能够重新让数据均衡的机制,即所谓的再平衡。有些系统的再平衡需要用户手动执行,有些则是自动的。ES就属于后者,它的再平衡是自动的,用户不参与,也几乎不感知。那到底怎样才算平衡(balanced)?ES官方对此的定义是:A cluster is balanced when it has an equal number of shards on each node without having a concentration of shards from any index on any node.简单说就是看每个节点上面的shard个数是否相等,越相近就越平衡。这里注意计数的时候是“无差别”的,即不管是哪个索引的shard,也不管是主分片的shard,还是副本的shard,一视同仁,只算个数。ES后台会有进程专门检查整个集群是否平衡,以及执行再平衡的操作。再平衡也就是将shard数多的迁移到shard数少的节点,让集群尽可能的平衡。关于再平衡,需要注意2个点:平衡的状态是一个范围,而不是一个点。即不是说各个节点的shard数严格相等才算平衡,而是大家的差别在一个可接受的范围内就算平衡。这个范围(也称阈值或权重)是可配置的,用户一般是无需参与的。再平衡是一个尽力而为的动作,它会在遵守各种策略的前提下,尽量让集群趋于平衡。看个简单的例子吧。有一个集群刚开始有2个节点(node42,node43),我们创建一个1 replica、6 primary shard的索引shard_alloc_test:PUT shard_alloc_test { "settings": { "index": { "number_of_shards" : "6", "number_of_replicas": "1" } } }查看一下shard的分配:GET _cat/shards?v index shard prirep state docs store ip node shard_alloc_test 3 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 3 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 2 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 2 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 4 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 4 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 1 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 1 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 5 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 5 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 0 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 0 r STARTED 0 208b 10.8.4.42 node-42可以看到,primary的6个shard和replica的6个shard的分配是非常均衡的:一方面,12个shard均匀分配到了2个节点上面;另一方面,primary shard和replica shard也是均匀交叉的分配到了2个节点上面。此时,我们对集群进行扩容,再增加一台节点:node-41。待节点成功加入集群后,我们看一下shard_alloc_test的shard分配:GET _cat/shards?v index shard prirep state docs store ip node shard_alloc_test 3 p STARTED 0 208b 10.8.4.41 node-41 shard_alloc_test 3 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 2 p STARTED 0 208b 10.8.4.41 node-41 shard_alloc_test 2 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 4 p STARTED 0 208b 10.8.4.41 node-41 shard_alloc_test 4 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 1 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 1 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 5 p STARTED 0 208b 10.8.4.41 node-41 shard_alloc_test 5 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 0 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 0 r STARTED 0 208b 10.8.4.42 node-42 GET _cat/tasks?v action task_id parent_task_id type start_time timestamp running_time ip node indices:data/read/get[s] 0KbNzeuGR1iZ1I1_3fIDVg:192304 - transport 1622513252244 02:07:32 1.6ms 10.8.4.42 node-42 indices:data/read/get jgfZk3snRb-uEUToQgo9pw:1841 - transport 1622513252487 02:07:32 1.4ms 10.8.4.43 node-43可以看到,shard自动的进行了再分配,均匀的分配到了3个节点上面。如果再平衡的时间稍长一点,你还可以通过task接口看到集群间的数据迁移任务。然后我们再缩容,停掉node-41这个节点,数据也会再次自动重新分配:GET _cat/shards?v index shard prirep state docs store ip node shard_alloc_test 3 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 3 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 2 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 2 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 1 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 1 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 4 r STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 4 p STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 5 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 5 r STARTED 0 208b 10.8.4.42 node-42 shard_alloc_test 0 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test 0 r STARTED 0 208b 10.8.4.42 node-42所以,ES的再平衡功能还是非常好用和易用的,完全自动化。但是细心的同学应该已经意识到一个问题:光靠保证shard个数均衡其实是没法保证数据均衡的,因为有些shard可能很大,存了很多数据;有些shard可能很小,只存了几条数据。的确是这样,所以光靠再平衡还是无法保证数据的均衡的,至少从存储容量的角度来说是不能保证均衡的。所以,ES还有一个默认就开启的基于磁盘容量的shard分配器。基于磁盘容量的shard分配基于磁盘容量的shard分配策略(Disk-based shard allocation)默认就是开启的,其机制也非常简单,主要就是3条非常重要的分水线(watermark):low watermark:默认值是85%。磁盘使用超过这个阈值,就认为“危险”快来了,这个时候就不会往该节点再分配replica shard了,但新创建的索引的primary shard还是可以分配。特别注意必须是新创建的索引(什么是“老的”?比如再平衡时其它节点上已经存在的primary shard就算老的,这部分也是不能够迁移到进入low watermark的节点上来的)。high watermark:默认值是90%。磁盘使用超过这个阈值,就认为“危险”已经来了,这个时候不会再往该节点分配任何shard,即primary shard和replica shard都不会分配。并且会开始尝试将节点上的shard迁移到其它节点上。flood stage watermark:默认值是95%。磁盘使用超过这个阈值,就认为已经病入膏肓了,需要做最后的挽救了,挽救方式也很简单——断臂求生:将有在该节点上分配shard的所有索引设置为只读,不允许再往这些索引写数据,但允许删除索引(index.blocks.read_only_allow_delete)。大概总结一下:当进入low watermark的时候,就放弃新创建的索引的副本分片数据了(即不创建对应的shard),但还是允许创建主分片数据;当进入high watermark的时候,新创建索引的主分片、副本分片全部放弃了,但之前已经创建的索引还是可以正常继续写入数据的;同时尝试将节点上的数据向其它节点迁移;当进入flood stage watermark,完全不允许往该节点上写入数据了,这是最后一道保护。只要在high watermark阶段,数据可以迁移到其它节点,并且迁移的速度比写入的速度快,那就不会进入该阶段。一些相关的配置如下:cluster.routing.allocation.disk.threshold_enabled:是否开启基于磁盘的分配策略,默认为true,表示开启。cluster.info.update.interval:多久检查一次磁盘使用,默认值是30s。cluster.routing.allocation.disk.watermark.low:配置low watermark,默认85%。cluster.routing.allocation.disk.watermark.high:配置high watermark,默认90%。cluster.routing.allocation.disk.watermark.flood_stage:配置flood stage watermark,默认95%。后面3个配置阈值的配置项除了可以使用百分比以外,也可以使用具体的值,比如配置为low watermark为10gb,表示剩余空闲磁盘低于10gb的时候,就认为到low watermark了。但是需要注意,要么3个配置项都配置百分比,要么都配置具体的值,不允许百分比和具体的值混用。另外需要注意:如果一个节点配置了多个磁盘,决策时会采用磁盘使用最高的那个。比如一个节点有2个磁盘,一个磁盘使用是84%,一个使用是86%,那也认为该节点进入low watermark了。最后,如果节点进入flood stage watermark阶段,涉及的索引被设置成read-only以后,如何恢复呢?第一步当然是先通过删数据或增加磁盘/节点等方式让磁盘使用率降到flood stage watermark的阈值以下。然后第二步就是恢复索引状态,取消只读。在7.4.0及以后版本,一旦检测到磁盘使用率低于阈值后,会自动恢复;7.4.0以前的版本,必须手动执行以下命令来取消只读状态:// 恢复单个索引 PUT /索引名称/_settings { "index.blocks.read_only_allow_delete": null } // 恢复所有索引 PUT _settings { "index": { "blocks": {"read_only_allow_delete": null} } } // curl命令 curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'下面看一下具体的例子。还是前面node-42和node-43组成的集群,每个节点的磁盘总空间是1GB。小技巧:为了方便验证这个功能,Linux用户可以挂载小容量的内存盘来进行操作。这样既免去了没有多个磁盘的烦恼,而且磁盘大小可以设置的比较小,容易操作。比如我测试的2个节点的数据目录就是挂载的1GB的内存盘。为了方便验证和看的更加清楚,我重新设置几个watermark的阈值,这里不使用百分比,而是用具体的值。PUT _cluster/settings { "transient": { "cluster.routing.allocation.disk.watermark.low": "800mb", "cluster.routing.allocation.disk.watermark.high": "600mb", "cluster.routing.allocation.disk.watermark.flood_stage": "500mb", "cluster.info.update.interval": "10s" } } # 检查下磁盘使用 GET _cat/nodes?v&h=n,dt,du,dup n dt du dup node-43 1gb 328kb 0.03 node-42 1gb 328kb 0.03也就是当空闲磁盘低于800mb时进入low watermark;低于600mb时进入high watermark;低于500mb时进入flood stage;每10秒检查一次。接下来写入一些数据,先让磁盘进入low watermark。// 为了查看方便,使用1个 primary shard,1个副本 PUT shard_alloc_test { "settings": { "index": { "number_of_shards" : "1", "number_of_replicas": "1" } } } // 磁盘低于800mb,进入low watermark GET _cat/nodes?v&h=n,dt,du,dup,da n dt du dup da node-43 1gb 236.1mb 23.06 787.8mb node-42 1gb 236.1mb 23.06 787.8mb // 进入low watermark时的日志 [2021-06-01T10:24:59,795][INFO ][o.e.c.r.a.DiskThresholdMonitor] [node-42] low disk watermark [800mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 799mb[78%], replicas will not be assigned to this node [2021-06-01T10:24:59,799][INFO ][o.e.c.r.a.DiskThresholdMonitor] [node-42] low disk watermark [800mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 799mb[78%], replicas will not be assigned to this node可以看到,空闲磁盘小于800MB的时候就进入了low watermark,ES也有对应的日志提示。这个时候副本已经不能分配到这节点上了,我们新建一个索引shard_alloc_test1验证一下。PUT shard_alloc_test1 { "settings": { "index": { "number_of_shards" : "1", "number_of_replicas": "1" } } } GET _cat/shards?v index shard prirep state docs store ip node shard_alloc_test 0 p STARTED 0 3.9mb 10.8.4.43 node-43 shard_alloc_test 0 r STARTED 0 3.9mb 10.8.4.42 node-42 shard_alloc_test1 0 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test1 0 r UNASSIGNED 可以看到,shard_alloc_test1的primary shard分配了,但replica shard没有分配,符合预期。再接着往shard_alloc_test写数据,让进入high watermark。GET _cat/nodes?v&h=n,dt,du,dup,da n dt du dup da node-43 1gb 468.2mb 45.73 555.7mb node-42 1gb 468.1mb 45.72 555.8mb [2021-06-01T10:32:59,875][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] high disk watermark [600mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 555.8mb[54.2%], shards will be relocated away from this node; currently relocating away shards totalling [0] bytes; the node is expected to continue to exceed the high disk watermark when these relocations are complete [2021-06-01T10:32:59,876][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] high disk watermark [600mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 555.7mb[54.2%], shards will be relocated away from this node; currently relocating away shards totalling [0] bytes; the node is expected to continue to exceed the high disk watermark when these relocations are complete可以看到,磁盘低于600MB的时候,进入high watermark。这个时候应该不会往该节点分配任何shard了(同时因为只有2个节点,且都引入high watermark了,所以也无法将节点上的shard迁移到其它节点),我们创建新索引shard_alloc_test2验证一下。PUT shard_alloc_test2 { "settings": { "index": { "number_of_shards" : "1", "number_of_replicas": "1" } } } GET _cat/shards?v index shard prirep state docs store ip node shard_alloc_test 0 p STARTED 0 17.5mb 10.8.4.43 node-43 shard_alloc_test 0 r STARTED 0 17.5mb 10.8.4.42 node-42 shard_alloc_test2 0 p UNASSIGNED shard_alloc_test2 0 r UNASSIGNED shard_alloc_test1 0 p STARTED 0 208b 10.8.4.43 node-43 shard_alloc_test1 0 r UNASSIGNED 可以看到,主分片、副本分片的shard都没有分配,符合预期。虽然新创建的索引的shard无法分配,但原有的索引还是可以正常写的,我们继续写数据,使磁盘进入flood stage。GET _cat/nodes?v&h=n,dt,du,dup,da n dt du dup da node-43 1gb 535.3mb 52.28 488.6mb node-42 1gb 535.8mb 52.33 488.1mb [2021-06-01T10:42:20,024][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] flood stage disk watermark [500mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 488.6mb[47.7%], all indices on this node will be marked read-only [2021-06-01T10:42:20,024][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] flood stage disk watermark [500mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 488.1mb[47.6%], all indices on this node will be marked read-only顺利进入flood stage,索引被设置为read-only。此时,客户端再次写入会收到类似如下错误(这里是JSON格式的日志):{ "error" : { "root_cause" : [ { "type" : "cluster_block_exception", "reason" : "index [shard_alloc_test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];" } ], "type" : "cluster_block_exception", "reason" : "index [shard_alloc_test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];" }, "status" : 429 }也就是当你的客户端出现“read-only-allow-delete block”错误日志时,表名ES的磁盘空间已经满了。如果是7.4.0版本之前的ES,除了恢复磁盘空间外,还要手动恢复索引的状态,取消只读。总结shard是ES中非常重要的一个概念,而且大部分时候我们不需要太多的关注shard分配的细节,ES默认就会帮我们处理好。但基本的原理还是要有一些了解,一方面可以让我们事先设计合理的方案;另一方面当出现问题时,也知道问题原因和解决方案。 Flink快速了解(7)——Async I/O http://niyanchun.com/flink-quick-learning-7-async-io.html 2021-04-03T11:23:00+08:00 上篇介绍了常见的算子,本文介绍另外一个重要的算子:Async I/O,即异步IO。它是流中频繁访问外部数据的利器,特别是当访问比较耗时的时候。产生背景先考虑一个实际中挺常见的场景:一个流处理程序中对于每个事件都要查一次外部的维表(比如HBase,这里暂不考虑缓存机制)做关联,那在Flink中如何实现呢?典型的做法就是增加一个map/flatmap,在里面做一下查询关联。这样功能没问题,但这个查询很容易会变成系统的瓶颈,特别是当外部查询比较耗时的时候。好在Flink里面有一个异步IO算子,可以很好的解决这个问题。异步IO是阿里巴巴贡献给Flink非常重要的一个特性,在Flink 1.2版本中正式发布,对应的提案是FLIP-12: Asynchronous I/O Design and Implementation。这个特性解决的问题和所有其它的异步IO、IO多路复用技术是一致的:IO往往比较耗时,通过异步IO可以提高系统的吞吐量。这个Async I/O特性只不过是流处理场景里面的异步IO而已,原理没有什么特殊之处,看下官方的一个图:左侧是同步IO,可以看到大部分时间用来等待了;右侧是异步IO,提升效果很明显。当然,通过提高任务的并行度也能一定程度的缓解同步IO的问题,这种方式有点类似于网络编程早期的per-connection-per-thread模型,但这种模式不够彻底,而且提高并行度的代价比较高。道理都懂,就不再赘述了,下面看怎么用。如何使用使用概述回想一下网络编程中的异步IO(这里指的是IO多路复用技术),必须要内核支持select、poll/epoll才可以。Flink的异步IO也类似,需要访问外部数据的客户端支持异步请求才可以。如果不支持的话,也可以通过线程池技术模拟异步请求,当然效果上会差一些,但一般还是比同步IO强的。具体到编码层面,分3个步骤:实现AsyncFunction接口,这个接口的作用是分发请求。Flink内置了一个实现类RichAsyncFunction,一般我们继承这个类即可。在AsyncFunction#asyncInvoke(...)中实现一个回调函数,在回调函数中获取异步执行的结果,并且传递给ResultFuture。将异步操作应用到某个流上面。下面是官方给的一段示例代码:// This example implements the asynchronous request and callback with Futures that have the // interface of Java 8's futures (which is the same one followed by Flink's Future) /** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. * 第1步:实现`AsyncFunction`接口 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future // 第2步:实现一个回调函数 CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { // 获取异步执行的结果 return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { // 并且传递给`ResultFuture` resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream DataStream<String> stream = ...; // apply the async I/O transformation // 第3步:将异步操作应用到某个流上面。 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);代码的执行逻辑是这样的:流里面的每一个事件都会调用asyncInvoke,然后我们在这个方法里面实现访问外部数据的异步请求,比如上面代码中的client.query(key),然后得到一个异步对象(比如Future对象)。给这个异步对象定义一个回调函数,比如上面代码中的CompletableFuture.supplyAsync(...).thenAccept(...),然后在该回调函数获取异步任务的执行结果,并且交给ResultFuture,这一步非常重要,待会还会再提到。为了防止异步IO无限堆积或者某些请求挂死,一般都会提供超时设置和最高的异步并发数,Flink的异步IO也不例外。上面代码中最后一行的unorderedWait参数中的1000就是异步任务的超时时间,100就是同时允许的并发请求数,称为Capacity。那超时后如何处理呢?看下AsyncFunction接口:public interface AsyncFunction<IN, OUT> extends Function, Serializable { /** * Trigger async operation for each stream input. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data * @exception Exception in case of a user code error. An exception will make the task fail and * trigger fail-over process. */ void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception; /** * {@link AsyncFunction#asyncInvoke} timeout occurred. By default, the result future is * exceptionally completed with a timeout exception. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data */ default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception { resultFuture.completeExceptionally( new TimeoutException("Async function call has timed out.")); } }除了定义了一个asyncInvoke方法,还定义了一个timeout,并且提供了一个默认实现。当异步任务超时后,就会回调该方法,默认是抛出异常,即整个任务失败。一般我们需要重新实现该方法。那如果并发数达到了限制会怎么样?答案是会阻塞,产生反压。另外需要注意的一个地方就是事件的有序性。因为是异步请求,所以返回时间是不确定的,Flink的异步IO既支持保证事件的原始顺序,也可以不保证。异步IO的内部实现是依靠队列的,当需要保证有序的时候,实际是将先得到结果但排在后面的事件缓存起来,所以必然会增加一些延迟和资源占用。而无序可以获得更好的实时性和吞吐,但需要注意的是当使用Event Time的时候,这个无序并不是全局无序,而是在两个watermark之间的事件无序,整体还是要遵从watermark机制的。无序使用AsyncDataStream.unorderedWait(...) ,有序使用AsyncDataStream.orderedWait(...)。同步模式&线程池模式&异步模式对比demo这部分主要是给了两个示例,演示如何使用Flink异步IO。第一个例子中包含了3种场景:在flatmap中实现外部访问的同步IO使用线程池实现的异步IO,且不保证顺序使用线程池实现的异步IO,且保证顺序其中,为了体现异步IO的优势,并没有真正访问数据库,而是使用了一个sleep操作,模拟比较耗时的IO操作。public void update(Tuple2<String, Integer> tuple2) throws SQLException { // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作 //ps.setLong(1, balance); //ps.setLong(2, 1); //ps.execute(); try { Thread.sleep(tuple2.f1); } catch (InterruptedException e) { e.printStackTrace(); } }注意:虽然没有真正访问数据库,但整个代码都是按照模拟真实场景写的,只是把里面执行数据库操作的换成了sleep,所以运行时还是会连接数据库、创建连接池。如果要自己运行代码,请修改代码中的数据库连接地址。另外,3个场景使用的数据源是同一个,而sleep的时间也是在数据源中定义好的,所以它们的IO耗时是相同的:static List<Tuple2<String, Integer>> dataset() { List<Tuple2<String, Integer>> dataset = new ArrayList<>(10); for (int i = 0; i < 10; i++) { // f0: 元素名称 f1: sleep的时间,模拟该元素需要的IO耗时 dataset.add(new Tuple2<>("e" + i, i % 3 + 1)); } return dataset; }下面是完整代码:public class ThreadPoolAsyncDemoFakeQuery { public static void main(String[] args) throws Exception { // --------------------------------- Async IO + Unordered ------------------------- StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment(); envAsyncUnordered.setParallelism(1); envAsyncUnordered.setBufferTimeout(0); DataStream<Tuple2<String, Integer>> source = envAsyncUnordered.fromCollection(dataset()); DataStream<String> result = AsyncDataStream.unorderedWait(source, new ThreadPoolAsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10); result.print(); System.out.println("Async + UnorderedWait:"); envAsyncUnordered.execute("unorderedWait"); // --------------------------------- Async IO + Ordered ------------------------- StreamExecutionEnvironment envAsyncOrdered = StreamExecutionEnvironment.getExecutionEnvironment(); envAsyncOrdered.setParallelism(1); envAsyncOrdered.setBufferTimeout(0); AsyncDataStream.orderedWait(envAsyncOrdered.fromCollection(dataset()), new ThreadPoolAsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10) .print(); System.out.println("Async + OrderedWait"); envAsyncOrdered.execute("orderedWait"); // --------------------------------- Sync IO ------------------------- StreamExecutionEnvironment envSync = StreamExecutionEnvironment.getExecutionEnvironment(); envSync.setParallelism(1); envSync.setBufferTimeout(0); envSync.fromCollection(dataset()) .process(new ProcessFunction<Tuple2<String, Integer>, String>() { private transient MysqlClient client; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); client = new MysqlClient(); } @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { try { client.update(value); out.collect(value + ": " + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } } }).print(); System.out.println("Sync IO:"); envSync.execute("Sync IO"); } static List<Tuple2<String, Integer>> dataset() { List<Tuple2<String, Integer>> dataset = new ArrayList<>(10); for (int i = 0; i < 10; i++) { // f0: 元素名称 f1: sleep的时间 dataset.add(new Tuple2<>("e" + i, i % 3 + 1)); } return dataset; } static class ThreadPoolAsyncMysqlRequest extends RichAsyncFunction<Tuple2<String, Integer>, String> { private transient ExecutorService executor; private transient MysqlClient client; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); executor = Executors.newFixedThreadPool(30); client = new MysqlClient(); } @Override public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) { executor.submit(() -> { long current = System.currentTimeMillis(); String output = input + ":" + current; try { client.update(input); resultFuture.complete(Collections.singleton(output)); } catch (SQLException e) { e.printStackTrace(); resultFuture.complete(Collections.singleton(input + ": " + e.getMessage())); } }); } @Override public void timeout(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception { System.out.printf("%s timeout\n", input); resultFuture.complete(Collections.singleton(input + ": time out")); } @Override public void close() throws Exception { client.close(); super.close(); } } static class MysqlClient { static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; static final String DB_URL = "jdbc:mysql://10.9.1.18:3306/test?useSSL=false"; private Connection conn; private PreparedStatement ps; public MysqlClient() throws Exception { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(DB_URL, "root", "root123."); ps = conn.prepareStatement("UPDATE account SET balance = ? WHERE id = ?;"); } public void update(Tuple2<String, Integer> tuple2) throws SQLException { // 为了模拟耗时IO,这里使用sleep替换真正的数据库操作 //ps.setLong(1, balance); //ps.setLong(2, 1); //ps.execute(); try { Thread.sleep(tuple2.f1); } catch (InterruptedException e) { e.printStackTrace(); } } public void close() { try { if (conn != null) { conn.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { e.printStackTrace(); } } } }代码输出如下:Async + UnorderedWait: (e0,1):1617109474293 (e3,1):1617109474294 (e1,2):1617109474293 (e4,2):1617109474294 (e2,3):1617109474293 (e6,1):1617109474295 (e9,1):1617109474297 (e7,2):1617109474296 (e5,3):1617109474295 (e8,3):1617109474297 Async + OrderedWait (e0,1):1617109474891 (e1,2):1617109474891 (e2,3):1617109474891 (e3,1):1617109474891 (e4,2):1617109474892 (e5,3):1617109474892 (e6,1):1617109474893 (e7,2):1617109474893 (e8,3):1617109474893 (e9,1):1617109474893 Sync IO: (e0,1): 1617109475257 (e1,2): 1617109475260 (e2,3): 1617109475264 (e3,1): 1617109475265 (e4,2): 1617109475268 (e5,3): 1617109475272 (e6,1): 1617109475274 (e7,2): 1617109475277 (e8,3): 1617109475281 (e9,1): 1617109475283可以看到:Async + UnorderedWait:耗时4ms,且输出的事件是乱序的Async + OrderedWait:耗时2ms,输出事件是保持原有顺序的Sync IO:耗时26ms,输出的事件也是保持原有顺序的该程序运行多次,输出不太稳定,但Async方式都是远远小于Sync的。因为数据量比较小,而且耗时都比较平均,所以无序的优势不明显,有时甚至还会比有序高。这个例子并不是一个严格的性能测试,但却可以用来体现Async相比于Sync的明显优势。第二个例子则是使用真实的异步请求客户端执行真正的操作。这里使用了jasync-sql用于异步访问MySQL。完整代码如下:public class AsyncDemo { public static void main(String[] args) throws Exception { final long repeat = 100L; // --------------------------------- Async IO + Unordered ------------------------- StreamExecutionEnvironment envAsyncUnordered = StreamExecutionEnvironment.getExecutionEnvironment(); envAsyncUnordered.setParallelism(1); envAsyncUnordered.setBufferTimeout(0); DataStream<Long> source = envAsyncUnordered.fromSequence(1, repeat); DataStream<String> result = AsyncDataStream.unorderedWait(source, new AsyncMysqlRequest(), 10, TimeUnit.SECONDS, 10); result.print(); System.out.println("Async + UnorderedWait:"); envAsyncUnordered.execute("unorderedWait"); } static class AsyncMysqlRequest extends RichAsyncFunction<Long, String> { Connection conn; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); conn = MySQLConnectionBuilder.createConnectionPool("jdbc:mysql://{your-ip}:3306/test?user=root&password={your-password}"); conn.connect().get(); } @Override public void asyncInvoke(Long input, ResultFuture<String> resultFuture) throws Exception { List<Long> balance = new ArrayList<>(1); balance.add(input); CompletableFuture<QueryResult> future = conn.sendPreparedStatement("SELECT * FROM account WHERE balance = ?;", balance); // 如果执行成功 future.thenAccept(queryResult -> { resultFuture.complete(Collections.singleton(balance + ":" + System.currentTimeMillis())); }); // 如果执行异常 future.exceptionally(e -> { e.printStackTrace(); resultFuture.complete(Collections.singleton(balance + e.getMessage() + ":" + System.currentTimeMillis())); return null; }); } @Override public void timeout(Long input, ResultFuture<String> resultFuture) throws Exception { System.out.printf("%d timeout\n", input); resultFuture.complete(Collections.singleton(input + ": time out")); } } }使用注意点不要在AsyncFunction#asyncInvoke(...)内部执行比较耗时的操作,比如同步等待异步请求的结果(应该放到回调中执行)。因为一个流中每个Partition只有一个AsyncFunction实例,一个实例里面的数据是顺序调用asyncInvoke的,如果在里面执行耗时操作,那异步效果将大打折扣,如果同步等待异步的结果,那其实就退化成同步IO了。异步请求超时回调默认是抛出异常,这样会导致整个Flink Job退出。这一般不是我们想要的,所以大多数时候都需要覆写timeout方法。在自定义的回调函数里面一定要使用ResultFuture#complete或ResultFuture#completeExceptionally将执行结果传递给ResultFuture,否则异步请求会一直堆积在队列里面。当队列满了以后,整个任务流就卡主了。Flink异步IO也是支持Checkpoint的,所以故障后可以恢复,提供Exactly-Once语义保证。原理浅析AsyncWaitOperator这里结合目前最新的Flink 1.12.1版本从源码角度简单分析一下Async I/O的实现,这里引用了Jark博客中的几张图(见文末引用部分)。如图,Flink Async I/O实质是通过队列的方式实现了一个异步的生产者-消费者模型。其中队列就是StreamElementQueue;生产者就是就是我们的主流,通过AsyncDataStream.xxxWait将数据放入队列;消费者就是图中的Emitter。调用流程就是流中的一个事件到了以后,先放入队列中,同时调用通过AsyncFunction#asyncInvoke(...)定义的异步逻辑。当异步逻辑执行完成后,就会将队列中该事件标记为已完成,然后Emitter将该事件的结果发送到下游继续执行。实现Async I/O最主要的类是AsyncWaitOperator,我把代码做了精简,保留了体现上面步骤的部分代码,具体见代码中的注释。@Internal public class AsyncWaitOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput { /** Capacity of the stream element queue. */ private final int capacity; /** Output mode for this operator. */ private final AsyncDataStream.OutputMode outputMode; /** Timeout for the async collectors. */ private final long timeout; /** Queue, into which to store the currently in-flight stream elements. */ // 队列 private transient StreamElementQueue<OUT> queue; @Override public void setup( StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); switch (outputMode) { case ORDERED: queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } this.timestampedCollector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { // add element first to the queue // 将事件放入队列 final ResultFuture<OUT> entry = addToWorkQueue(element); final ResultHandler resultHandler = new ResultHandler(element, entry); // register a timeout for the entry if timeout is configured // 设置异步回调的超时处理 if (timeout > 0L) { final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timeoutTimer = getProcessingTimeService() .registerTimer( timeoutTimestamp, timestamp -> userFunction.timeout( element.getValue(), resultHandler)); resultHandler.setTimeoutTimer(timeoutTimer); } // 调用异步逻辑 userFunction.asyncInvoke(element.getValue(), resultHandler); } /** * Add the given stream element to the operator's stream element queue. This operation blocks * until the element has been added. * * <p>Between two insertion attempts, this method yields the execution to the mailbox, such that * events as well as asynchronous results can be processed. * * @param streamElement to add to the operator's queue * @throws InterruptedException if the current thread has been interrupted while yielding to * mailbox * @return a handle that allows to set the result of the async computation for the given * element. */ private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException { Optional<ResultFuture<OUT>> queueEntry; while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { mailboxExecutor.yield(); } return queueEntry.get(); } /** * Outputs one completed element. Watermarks are always completed if it's their turn to be * processed. * * <p>This method will be called from {@link #processWatermark(Watermark)} and from a mail * processing the result of an async function call. */ private void outputCompletedElement() { if (queue.hasCompletedElements()) { // emit only one element to not block the mailbox thread unnecessarily queue.emitCompletedElement(timestampedCollector); } } }可以看到processElement方法中先将事件放入队列,然后注册超时定时器,最后再调用异步处理逻辑。其中addToWorkQueue中使用了一个while循环往队列里面放入事件,也就是如果队列满了,这个插入操作会一直阻塞在这里。也就是前文提到的:如果并发的异步请求数达到了Capacity的值,流就会阻塞产生反压。然后,在outputCompletedElement方法中实现了将完成的异步请求事件发送到下游的操作。前文提到了Flink Async I/O提供了有序和无序两种保证,这部分功能是通过不同的队列实现的。上面的代码中也能看到有序的时候使用的是OrderedStreamElementQueue,无序的时候使用的是UnorderedStreamElementQueue,队列的大小就是最大并发的异步请求数Capacity。这两个队列都是图中StreamElementQueue接口的具体实现,先看下这个接口的定义:/** Interface for stream element queues for the {@link AsyncWaitOperator}. */ @Internal public interface StreamElementQueue<OUT> { /** * Tries to put the given element in the queue. This operation succeeds if the queue has * capacity left and fails if the queue is full. * * <p>This method returns a handle to the inserted element that allows to set the result of the * computation. * * @param streamElement the element to be inserted. * @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement); /** * Emits one completed element from the head of this queue into the given output. * * <p>Will not emit any element if no element has been completed (check {@link * #hasCompletedElements()} before entering any critical section). * * @param output the output into which to emit */ void emitCompletedElement(TimestampedCollector<OUT> output); /** * Checks if there is at least one completed head element. * * @return True if there is a completed head element. */ boolean hasCompletedElements(); /** * Returns the collection of {@link StreamElement} currently contained in this queue for * checkpointing. * * <p>This includes all non-emitted, completed and non-completed elements. * * @return List of currently contained {@link StreamElement}. */ List<StreamElement> values(); /** * True if the queue is empty; otherwise false. * * @return True if the queue is empty; otherwise false. */ boolean isEmpty(); /** * Return the size of the queue. * * @return The number of elements contained in this queue. */ int size(); }接口定义了5个方法:tryPut:向队列中插入数据;emitCompletedElement:向下游发送已经完成的异步请求,即图上中的Emitter;hasCompletedElements:判断是否有已经完成的异步请求values:当前队列中的事件isEmpty:队列是否为空size:队列大小下面分别看下有序和无序队列是如何实现这个接口的。有序队列OrderedStreamElementQueue有序实现比较简单,先进先出就行了。所以OrderedStreamElementQueue的代码量不多,我就全贴这里了:@Internal public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Queue for the inserted StreamElementQueueEntries. */ private final Queue<StreamElementQueueEntry<OUT>> queue; public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } @Override public boolean hasCompletedElements() { return !queue.isEmpty() && queue.peek().isDone(); } @Override public void emitCompletedElement(TimestampedCollector<OUT> output) { if (hasCompletedElements()) { final StreamElementQueueEntry<OUT> head = queue.poll(); head.emitResult(output); } } @Override public List<StreamElement> values() { List<StreamElement> list = new ArrayList<>(this.queue.size()); for (StreamElementQueueEntry e : queue) { list.add(e.getInputElement()); } return list; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) { if (queue.size() < capacity) { StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement); queue.add(queueEntry); LOG.debug( "Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity); return Optional.of(queueEntry); } else { LOG.debug( "Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity); return Optional.empty(); } } private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) { if (streamElement.isRecord()) { return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement); } if (streamElement.isWatermark()) { return new WatermarkQueueEntry<>((Watermark) streamElement); } throw new UnsupportedOperationException("Cannot enqueue " + streamElement); } }里面就使用了一个ArrayDeque实现了队列,事件按顺序进入队列,出队列调用的是queue.poll(),所以顺序自然是保证的。在出队列的时候会先判断是否有已经完成的异步请求,即hasCompletedElements,看下它的实现: @Override public boolean hasCompletedElements() { return !queue.isEmpty() && queue.peek().isDone(); }代码里面调用queue中头部元素(peek)的isDone方法。队列里面的元素是StreamElementQueueEntry接口类型,该接口继承自ResultFuture接口,且对应的实现类是StreamRecordQueueEntry,我们看下里面相关的实现代码:class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> { @Nonnull private final StreamRecord<?> inputRecord; private Collection<OUT> completedElements; StreamRecordQueueEntry(StreamRecord<?> inputRecord) { this.inputRecord = Preconditions.checkNotNull(inputRecord); } @Override public boolean isDone() { return completedElements != null; } @Nonnull @Override public StreamRecord<?> getInputElement() { return inputRecord; } @Override public void emitResult(TimestampedCollector<OUT> output) { output.setTimestamp(inputRecord); for (OUT r : completedElements) { output.collect(r); } } @Override public void complete(Collection<OUT> result) { this.completedElements = Preconditions.checkNotNull(result); } }可以看到isDone方法的逻辑是判断completedElements是否为null,而completedElements的赋值就是在complete方法中的。这就是为什么我们必须在AsyncFunction#asyncInvoke(...)中定义的回调函数中调用ResultFuture#complete的原因。如果不这样,队列中的异步事件就无法被标记为已完成,当队列满了以后,整个系统的数据流就卡主了。当然异常时调用ResultFuture#completeExceptionally也是OK的,这个在AsyncWaitOperator.ResultHandler类中也有调用。最后附一张来自Jark博客的图:无序队列UnorderedStreamElementQueue无序队列队列相对复杂一些。如果是使用Processing Time,那就是全局无序,也比较简单;但当使用Event Time的时候,如前文所述,就不是全局无序了,在相邻的两个watermark产生的区间内(代码实现中称为一个Segment)事件可以无序;但不同的区间还是要有序,即整体还是必须遵从Watermark的限制。所以为了实现这种机制,无序队列引入了Segment,一个Segment表示相邻watermark产生的一个区间。也就是Segment内无序,Segment之间有序。我们看下代码实现。@Internal public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Queue of queue entries segmented by watermarks. */ private final Deque<Segment<OUT>> segments; private int numberOfEntries; public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; // most likely scenario are 4 segments <elements, watermark, elements, watermark> this.segments = new ArrayDeque<>(4); this.numberOfEntries = 0; } @Override public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) { if (size() < capacity) { StreamElementQueueEntry<OUT> queueEntry; if (streamElement.isRecord()) { queueEntry = addRecord((StreamRecord<?>) streamElement); } else if (streamElement.isWatermark()) { queueEntry = addWatermark((Watermark) streamElement); } else { throw new UnsupportedOperationException("Cannot enqueue " + streamElement); } numberOfEntries++; LOG.debug( "Put element into unordered stream element queue. New filling degree " + "({}/{}).", size(), capacity); return Optional.of(queueEntry); } else { LOG.debug( "Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", size(), capacity); return Optional.empty(); } } private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> record) { // ensure that there is at least one segment Segment<OUT> lastSegment; if (segments.isEmpty()) { lastSegment = addSegment(capacity); } else { lastSegment = segments.getLast(); } // entry is bound to segment to notify it easily upon completion StreamElementQueueEntry<OUT> queueEntry = new SegmentedStreamRecordQueueEntry<>(record, lastSegment); lastSegment.add(queueEntry); return queueEntry; } private Segment<OUT> addSegment(int capacity) { Segment newSegment = new Segment(capacity); segments.addLast(newSegment); return newSegment; } private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) { Segment<OUT> watermarkSegment; if (!segments.isEmpty() && segments.getLast().isEmpty()) { // reuse already existing segment if possible (completely drained) or the new segment // added at the end of // this method for two succeeding watermarks watermarkSegment = segments.getLast(); } else { watermarkSegment = addSegment(1); } StreamElementQueueEntry<OUT> watermarkEntry = new WatermarkQueueEntry<>(watermark); watermarkSegment.add(watermarkEntry); // add a new segment for actual elements addSegment(capacity); return watermarkEntry; } @Override public boolean hasCompletedElements() { return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted(); } @Override public void emitCompletedElement(TimestampedCollector<OUT> output) { if (segments.isEmpty()) { return; } final Segment currentSegment = segments.getFirst(); numberOfEntries -= currentSegment.emitCompleted(output); // remove any segment if there are further segments, if not leave it as an optimization even // if empty if (segments.size() > 1 && currentSegment.isEmpty()) { segments.pop(); } } @Override public List<StreamElement> values() { List<StreamElement> list = new ArrayList<>(); for (Segment s : segments) { s.addPendingElements(list); } return list; } @Override public boolean isEmpty() { return numberOfEntries == 0; } @Override public int size() { return numberOfEntries; } /** An entry that notifies the respective segment upon completion. */ static class SegmentedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> { private final Segment<OUT> segment; SegmentedStreamRecordQueueEntry(StreamRecord<?> inputRecord, Segment<OUT> segment) { super(inputRecord); this.segment = segment; } @Override public void complete(Collection<OUT> result) { super.complete(result); segment.completed(this); } } /** * A segment is a collection of queue entries that can be completed in arbitrary order. * * <p>All elements from one segment must be emitted before any element of the next segment is * emitted. */ static class Segment<OUT> { /** Unfinished input elements. */ private final Set<StreamElementQueueEntry<OUT>> incompleteElements; /** Undrained finished elements. */ private final Queue<StreamElementQueueEntry<OUT>> completedElements; Segment(int initialCapacity) { incompleteElements = new HashSet<>(initialCapacity); completedElements = new ArrayDeque<>(initialCapacity); } /** Signals that an entry finished computation. */ void completed(StreamElementQueueEntry<OUT> elementQueueEntry) { // adding only to completed queue if not completed before // there may be a real result coming after a timeout result, which is updated in the // queue entry but // the entry is not re-added to the complete queue if (incompleteElements.remove(elementQueueEntry)) { completedElements.add(elementQueueEntry); } } /** * True if there are no incomplete elements and all complete elements have been consumed. */ boolean isEmpty() { return incompleteElements.isEmpty() && completedElements.isEmpty(); } /** * True if there is at least one completed elements, such that {@link * #emitCompleted(TimestampedCollector)} will actually output an element. */ boolean hasCompleted() { return !completedElements.isEmpty(); } /** * Adds the segmentd input elements for checkpointing including completed but not yet * emitted elements. */ void addPendingElements(List<StreamElement> results) { for (StreamElementQueueEntry<OUT> element : completedElements) { results.add(element.getInputElement()); } for (StreamElementQueueEntry<OUT> element : incompleteElements) { results.add(element.getInputElement()); } } /** * Pops one completed elements into the given output. Because an input element may produce * an arbitrary number of output elements, there is no correlation between the size of the * collection and the popped elements. * * @return the number of popped input elements. */ int emitCompleted(TimestampedCollector<OUT> output) { final StreamElementQueueEntry<OUT> completedEntry = completedElements.poll(); if (completedEntry == null) { return 0; } completedEntry.emitResult(output); return 1; } /** * Adds the given entry to this segment. If the element is completed (watermark), it is * directly moved into the completed queue. */ void add(StreamElementQueueEntry<OUT> queueEntry) { if (queueEntry.isDone()) { completedElements.add(queueEntry); } else { incompleteElements.add(queueEntry); } } } }队列部分的代码如下:private final Deque<Segment<OUT>> segments; // most likely scenario are 4 segments <elements, watermark, elements, watermark> this.segments = new ArrayDeque<>(4); /** Unfinished input elements. */ private final Set<StreamElementQueueEntry<OUT>> incompleteElements; /** Undrained finished elements. */ private final Queue<StreamElementQueueEntry<OUT>> completedElements; Segment(int initialCapacity) { incompleteElements = new HashSet<>(initialCapacity); completedElements = new ArrayDeque<>(initialCapacity); }可以看到整体还是一个ArrayDeque,但队列内部是一个Segment;Segment内部又分成了2个队列:未完成请求队列incompleteElements和已完成请求的队列completedElements。Segment内部是无序的,所以incompleteElements使用的数据类型是Set。当某个请求完成后,就转移到completedElements中。此时,如果来的是普通事件,则调用addRecord将事件放入对应Segment的incompleteElements队列中;如果来的是watermark,则调用addSegment创建一个新的Segment。如果使用Processing Time,那就永远没有watermark,也就是全局就是一个大的Segment,也就是全局无序。最后附一张Jark博客的图:总结Flink Asycn I/O是阿里结合自身实践场景贡献给社区的一个特性,所以自然是有很多实际需求的。流中关联维表的操作在具体业务中也的确很常见,异步IO就是应对这些场景的利剑。在具体使用时要特别注意本文“使用注意点”一节的几个点。References:Asynchronous I/O for External Data AccessFlink 原理与实现:Aysnc I/OFlink 源码阅读笔记(14)- Async I/O 的实现 Flink快速了解(6)——常用算子(Operator) http://niyanchun.com/flink-quick-learning-6-operators.html 2021-03-28T21:39:00+08:00 Flink的Stream Job就是由一些算子构成的(Source和Sink实质也是特殊的算子而已),本文介绍常见的DataStream算子(Operator)。我用一种不太科学的方式将这些算子分成了2类,并起了一个不太严谨的名字:单流算子:这类算子一般在一个流上面使用;多流算子:这类算子往往操作多个流。单流算子单流算子大都比较简单,粗略介绍。map/flatmap:使用最多的算子,map是输入一个元素,输出一个元素;flatmap是输入一个元素,输出0个或多个元素。filter:过滤,条件为真就继续往下传,为假就过滤掉了。keyBy:按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,经过keyBy的流变成KeyedStream。一般和其它算子一起使用。reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // produce 1,2,3,4,5 DataStream<Long> source = env.fromSequence(1, 5); source.keyBy(value -> 1) // 所有数据在一个Partition .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) throws Exception { // value1 保存上次迭代的结果值 System.out.printf("value1: %d, value2: %d\n", value1, value2); return value1 + value2; } }).print(); env.execute(); } }输出如下:value1: 1, value2: 2 3 value1: 3, value2: 3 6 value1: 6, value2: 4 10 value1: 10, value2: 5 15Aggregation算子:在KeyedStream上面使用,包括sum、min、minBy、max,maxBy。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧: public class AggregationDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment(); envMin.setParallelism(1); envMinBy.setParallelism(1); List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>(); data.add(new Tuple3<>(0, 2, 4)); data.add(new Tuple3<>(0, 4, 5)); data.add(new Tuple3<>(0, 3, 3)); data.add(new Tuple3<>(0, 1, 2)); data.add(new Tuple3<>(1, 2, 4)); data.add(new Tuple3<>(1, 5, 1)); data.add(new Tuple3<>(1, 1, 0)); data.add(new Tuple3<>(1, 2, 2)); System.out.println("Min:"); DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data); sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print(); envMin.execute(); System.out.println("\nMinBy:"); DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data); sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print(); envMinBy.execute(); } }输出结果: Min: (0,2,4) (0,2,4) (0,2,4) (0,1,4) (1,2,4) (1,2,4) (1,1,4) (1,1,4) MinBy: (0,2,4) (0,2,4) (0,2,4) (0,1,2) (1,2,4) (1,2,4) (1,1,0) (1,1,0)可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。window类算子:这个就不说了,前面的多篇文章已经介绍过了。下面看稍微复杂一些的多流算子。多流算子多流算子的差异点主要体现在以下3个方面:能同时处理的流的个数是否可以处理不同类型的流如何处理unionunion用于将多个、同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:public class UnionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Long> source = env.fromSequence(0, 5); source.print(); DataStream<Long> unionStream = source.union(source); unionStream.print(); env.execute(); } }输出:# source 0 1 2 3 4 5 # unionStream 0 0 1 1 2 2 3 3 4 4 5 5joinjoin只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:Window Join:通过窗口获取一个数据集Tumbling Window JoinSliding Window JoinSession Window JoinInterval Join:通过定义一个时间段获取一个数据集Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:stream.join(otherStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:public class WindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource()); DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource()); orangeStream.join(greenStream) .where(orangeStreamTuple2 -> orangeStreamTuple2.f0) .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0) .window(TumblingEventTimeWindows.of(Time.milliseconds(1000))) .apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>) (orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1) .print(); env.execute(); } static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { // window: [0, 1000) ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0); ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0); ctx.emitWatermark(new Watermark(1000)); // window: [1000, 2000) ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500); ctx.emitWatermark(new Watermark(2000)); // window: [2000, 3000) ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500); ctx.emitWatermark(new Watermark(3000)); // window: [3000, 4000) ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500); } @Override public void cancel() { } } static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { // window: [0, 1000) ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0); ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0); // window: [1000, 2000) ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500); ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500); // window: [2000, 3000) ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500); ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500); // window: [3000, 4000) ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500); ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500); ; } @Override public void cancel() { } } }代码运行如下:0,0 0,1 1,0 1,1 2,3 3,3 4,4 5,4Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound],然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]。IntervalJoin目前只支持Event Time。 其语法如下:stream.keyBy(<KeySelector>) .intervalJoin(otherStream.keyBy(<KeySelector>)) .between(<lowerBoundTime>, <upperBoundTime>) .process (<ProcessJoinFunction>);这里再借用官方的一个图:上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1],下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:public class IntervalJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource()); DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource()); orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0) .intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { @Override public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception { out.collect(orangeTuple2.f1 + "," + greenTuple2.f1); } }).print(); env.execute(); } static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { // window: [0, 1) ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0); // window: [1, 2) ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1); // window: [6, 7) ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6); // window: [7, 8) ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7); } @Override public void cancel() { } } static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> { @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { ctx.emitWatermark(new Watermark(0)); ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0); ctx.emitWatermark(new Watermark(2)); ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2); ctx.emitWatermark(new Watermark(3)); ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3); ctx.emitWatermark(new Watermark(4)); ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4); ctx.emitWatermark(new Watermark(5)); ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5); ctx.emitWatermark(new Watermark(7)); ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7); } @Override public void cancel() { } } }输出如下:0,0 0,1 2,0 2,1 5,6 7,6 7,7总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。cogroup前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:dataStream.coGroup(otherStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner> .apply (new CoGroupFunction () {...});相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:public class CoGroupDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource()); DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource()); orangeStream.coGroup(greenStream) .where(orangeStreamTuple2 -> orangeStreamTuple2.f0) .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0) .window(TumblingEventTimeWindows.of(Time.milliseconds(1000))) .apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { @Override public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception { StringBuffer sb = new StringBuffer(); sb.append("window:{ orange stream elements: [ "); orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" ")); sb.append("], green stream elements: [ "); green.forEach(tuple2 -> sb.append(tuple2.f1).append(" ")); sb.append("]"); out.collect(sb.toString()); } }) .print(); env.execute(); } }输出如下:window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ] window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ] window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ] window:{ orange stream elements: [ ], green stream elements: [ 4 ] window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]connectconnect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。单独的connect:public class ConnectDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 主数据流 DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() { @Override public void run(SourceContext<Long> ctx) throws Exception { long i = 0; while (true) { ctx.collect(i++); Thread.sleep(500); } } @Override public void cancel() { } }); // 规则数据流 DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { ctx.collect("one"); Thread.sleep(5000); ctx.collect("two"); Thread.sleep(5000); ctx.collect("three"); Thread.sleep(Long.MAX_VALUE); } @Override public void cancel() { } }); dataStream.connect(ruleStream) .flatMap(new CoFlatMapFunction<Long, String, Object>() { String rule; @Override public void flatMap1(Long value, Collector<Object> out) throws Exception { if ("one".equalsIgnoreCase(rule)) { out.collect(value); } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) { out.collect(value); } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) { out.collect(value); } } @Override public void flatMap2(String value, Collector<Object> out) throws Exception { System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value); rule = value; } }).print(); env.execute(); } }输出如下:update rule, old rule = null, new rule = one 1 2 3 4 5 6 7 8 9 update rule, old rule = one, new rule = two 10 12 14 16 18 update rule, old rule = two, new rule = three 21 24 27 30 33 36 ...配合broadcast实现:public class BroadcastDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 主数据流 DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() { @Override public void run(SourceContext<Long> ctx) throws Exception { long i = 0; while (true) { ctx.collect(i++); Thread.sleep(500); } } @Override public void cancel() { } }); // 规则数据流 DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { ctx.collect("one"); Thread.sleep(5000); ctx.collect("two"); Thread.sleep(5000); ctx.collect("three"); Thread.sleep(Long.MAX_VALUE); } @Override public void cancel() { } }); MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<String>() { })); BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor); dataStream.connect(ruleBroadcastStream) .process(new BroadcastProcessFunction<Long, String, Long>() { String rule; @Override public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception { if ("one".equalsIgnoreCase(rule)) { out.collect(value); } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) { out.collect(value); } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) { out.collect(value); } } @Override public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception { System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value); rule = value; } }).print(); env.execute(); } }输出与单独connect的输出一模一样,这里就不再附运行结果了。IterateIterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。public class IterateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4); tuple2s.add(new Tuple2<>("Jim", 3)); tuple2s.add(new Tuple2<>("John", 2)); tuple2s.add(new Tuple2<>("Lily", 1)); tuple2s.add(new Tuple2<>("Lucy", 4)); // source DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s); source.print(); // 得到一个IterativeStream IterativeStream<Tuple2<String, Integer>> iteration = source.iterate(); // 执行常规计算 DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception { System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1); return new Tuple2<>(value.f0, value.f1 - 1); } }); // 找出需要迭代的元素形成一个流 DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() { @Override public boolean filter(Tuple2<String, Integer> value) throws Exception { boolean greaterThanZero = value.f1 > 0; if (greaterThanZero) { System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1); } return greaterThanZero; } }); // 将需要迭代的流传递给closeWith方法 iteration.closeWith(stillGreaterThanZero); minusOne.filter(tuple2 -> tuple2.f1 <= 0).print(); env.execute(); } }输出如下:(Jim,3) (John,2) (Lily,1) (Lucy,4) [minusOne] Jim: 3 Jim is still greater than zero(now: 2), back to minusOne [minusOne] John: 2 John is still greater than zero(now: 1), back to minusOne [minusOne] Lily: 1 (Lily,0) [minusOne] Lucy: 4 Lucy is still greater than zero(now: 3), back to minusOne [minusOne] Jim: 2 Jim is still greater than zero(now: 1), back to minusOne [minusOne] John: 1 (John,0) [minusOne] Lucy: 3 Lucy is still greater than zero(now: 2), back to minusOne [minusOne] Jim: 1 (Jim,0) [minusOne] Lucy: 2 Lucy is still greater than zero(now: 1), back to minusOne [minusOne] Lucy: 1 (Lucy,0)总结一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。下篇文章介绍另外一个非常重要的算子:Flink Async I/O。 Filebeat源码浅析 http://niyanchun.com/filebeat-source-learning.html 2021-03-06T12:05:00+08:00 本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:Filebeat实例的创建和初始化流程;文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;数据发送逻辑,即Publisher/Pipeline;Filebeat的ACK机制。另外,本文包含大量代码,但为了尽量减少代码所占篇幅和突出重点逻辑,大部分代码都做了删减,有的地方有说明,有的没有说明。如果想看完整代码,建议按照下面的指导搭建自己的源码环境,对照着阅读。当然,本文假设你已经对Filebeat基本使用比较熟悉了。源码准备Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic,且GO111MODULE设置为auto或者off;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。本文的讲解是基于6.8分支的代码(6.8.15),该版本的beat模块依赖还是早期的govendor,所以用如下命令搭建源码环境:mkdir -p ${GOPATH}/src/github.com/elastic git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats cd ${GOPATH}/src/github.com/elastic/beats git checkout 6.8如果你无法科学上网,从github拉取beat的代码会很慢(主要是因为.git目录非常大),所以我把自己的代码打包传到了百度网盘,实在从github上拉不下来的可以直接下载我上传的(点此进入下载页面,密码: 6bro)。我提供了2个版本,一个包含git,一个不包含,都不影响代码阅读,没有git的好处是代码会小很多。我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e参数。如果是vscode,则直接修改.vscode/launch.json即可:{ // Use IntelliSense to learn about possible attributes. // Hover to view descriptions of existing attributes. // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ { "name": "Launch file", "type": "go", "request": "launch", "mode": "debug", "program": "${file}", "args": ["-e"] } ] }filebeat与libbeat熟悉Beat的应该都知道Beat里面有一个核心的libbeat模块,实现了很多基础功能,比如实例初始化、公共配置、队列、输出(output)、日志等,不同的Beat往往只需要实现如何采集数据,然后将数据以事件(event)的方式通过channel发给libbeat即可,这种方式大大简化了各个Beat的开发。下面是官方的一张说明Beat与libbeat关系的图:上面提到的事件是在libbeat/beat/event.go中定义的:// Event is the common event format shared by all beats. // Every event must have a timestamp and provide encodable Fields in `Fields`. // The `Meta`-fields can be used to pass additional meta-data to the outputs. // Output can optionally publish a subset of Meta, or ignore Meta. type Event struct { Timestamp time.Time Meta common.MapStr Fields common.MapStr Private interface{} // for beats private use }Filebeat作为Beat的一员,也遵从上面的逻辑:Filebeat实现了文件的采集,并以event的方式发送给libbeat的Publisher,后续的工作都由libbeat完成。所以了解Filebeat完整的数据流程包含Filebeat和libbeat两部分代码。对应的代码在根目录下的filebeat目录和libbeat目录。为了方便理解,这里先给出一个精简的数据流图:各部分主要功能是:Filebeat instance:Filebeat实例的创建和初始化;Crawler:遍历配置中的input,创建、初始化Input实例;Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline;Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。下面分别介绍这些模块。实例初始化 && Registrar第一步是Filebeat的实例初始化。libbeat定义了一个Beater接口,不同的Beat实现这个接口,即可复用libbeat定义的诸多基础功能,这个接口定义如下:// libbeat/beat/beat.go // Beater is the interface that must be implemented by every Beat. A Beater // provides the main Run-loop and a Stop method to break the Run-loop. // Instantiation and Configuration is normally provided by a Beat-`Creator`. // // Once the beat is fully configured, the Run() method is invoked. The // Run()-method implements the beat its run-loop. Once the Run()-method returns, // the beat shuts down. // // The Stop() method is invoked the first time (and only the first time) a // shutdown signal is received. The Stop()-method normally will stop the Run()-loop, // such that the beat can gracefully shutdown. type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() } // Beat contains the basic beat data and the publisher client used to publish // events. type Beat struct { Info Info // beat metadata. Publisher Pipeline // Publisher pipeline SetupMLCallback SetupMLCallback // setup callback for ML job configs InSetupCmd bool // this is set to true when the `setup` command is called OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback // XXX: remove Config from public interface. // It's currently used by filebeat modules to setup the Ingest Node // pipeline and ML jobs. Config *BeatConfig // Common Beat configuration data. BeatConfig *common.Config // The beat's own configuration section Fields []byte // Data from fields.yml ConfigManager management.ConfigManager // config manager }Filebeat在filebeat/beater/filebeat.go中实现了这个接口。我们看下Filebeat是如何从main函数(filebeat/main.go)流转到filebeat.go的:// filebeat/main.go // The basic model of execution: // - input: finds files in paths/globs to harvest, starts harvesters // - harvester: reads a file, sends events to the spooler // - spooler: buffers events until ready to flush to the publisher // - publisher: writes to the network, notifies registrar // - registrar: records positions of files read // Finally, input uses the registrar information, on restart, to // determine where in each file to restart a harvester. func main() { if err := cmd.RootCmd.Execute(); err != nil { os.Exit(1) } }main前面的注释概述了Filebeat的运行逻辑,也是前面图的由来。main方法里面的代码很简单,下面直接给出关键的调用关系:filebeat/main.go#mainfilebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)filebeat/beater/filebeat.go#Run这部分的代码流转主要是在libbeat中,所以是一些比较通用化的配置,主要是构造Beater实例(此处是Filebeat实例),并最终进入到filebeat.go的Run方法中。Run方法中有两个比较重要的操作:registrar和crawler的创建、启动。Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:// filebeat/registrar/registrar.go type Registrar struct { Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indicates the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int }每次重启Filebeat后,在registrar启动的时候会调用func (r *Registrar) loadStates()读取registry文件中的内容,和需要采集的文件作对比(func (p *Input) loadStates(states []file.State)),确认哪些文件采集过了,哪些有更新,需要接着上次的地方继续采集。确认消息成功发送的回调(通过channel实现)函数是在Run方法里面注册的(这部分后面还会专门讲解):err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents, })Registrar的记录文件默认为filebeat安装目录/data/registry,示例内容如下(为了方便查看,进行了格式化):[{ "source": "/Users/allan/Desktop/temp/test-logs/test-1.log", "offset": 362, "timestamp": "2021-02-28T21:28:55.435218+08:00", "ttl": -1, "type": "log", "meta": null, "FileStateOS": { "inode": 8712351738, "device": 16777225 } }, { "source": "/Users/allan/Desktop/temp/test-logs/test-2.log", "offset": 362, "timestamp": "2021-02-28T21:28:55.24922+08:00", "ttl": -1, "type": "log", "meta": null, "FileStateOS": { "inode": 8712603538, "device": 16777225 } }]另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。输入/文件遍历(Crawler && Input)Crawler我们知道filebeat采集数据是通过在配置文件里面配置若干个input实现的,而Crawler的作用就是解析这些input,并创建、启动Input。Crawler的核心代码在filebeat/crawler/crawler.go中。核心结构定义如下:type Crawler struct { inputs map[uint64]*input.Runner // 包含若干个Input inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} }在Start方法中遍历配置中的input,并创建、启动input:// Start starts the crawler with all inputs func (c *Crawler) Start( pipeline beat.Pipeline, r *registrar.Registrar, configInputs *common.Config, configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool, ) error { logp.Info("Loading Inputs: %v", len(c.inputConfigs)) // Prospect the globs/paths given on the command line and launch harvesters for _, inputConfig := range c.inputConfigs { err := c.startInput(pipeline, inputConfig, r.GetStates()) if err != nil { return err } } // 省略部分代码 logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs)) return nil } // 创建、启动Input func (c *Crawler) startInput( pipeline beat.Pipeline, config *common.Config, states []file.State, ) error { if !config.Enabled() { return nil } connector := channel.ConnectTo(pipeline, c.out) p, err := input.New(config, connector, c.beatDone, states, nil) if err != nil { return fmt.Errorf("Error in initing input: %s", err) } p.Once = c.once if _, ok := c.inputs[p.ID]; ok { return fmt.Errorf("Input with same ID already exists: %d", p.ID) } c.inputs[p.ID] = p p.Start() return nil }InputFilebeat支持多种类型的Input,比如log、redis、stdin、docker等,这些代码都在filebeat/input目录,不同类型的目录下面实现了特定的input。通用Input接口定义如下:// filebeat/input/input.go // Input is the interface common to all input type Input interface { Run() Stop() Wait() } // Start starts the input func (p *Runner) Start() { p.wg.Add(1) logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID) // 省略部分代码 // Add waitgroup to make sure input is finished go func() { defer func() { onceWg.Done() p.stop() p.wg.Done() }() p.Run() }() } // Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file func (p *Runner) Run() { // Initial input run p.input.Run() // 省略部分代码 }此处以最常用的log类型为例进行说明。p.input.Run()会跳转到filebeat/log/input.go#Run:// Input contains the input and its config type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 1个input包含多个harvesters outlet channel.Outleter stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string stopOnce sync.Once } // Run runs the input func (p *Input) Run() { logp.Debug("input", "Start next scan") // TailFiles is like ignore_older = 1ns and only on startup if p.config.TailFiles { ignoreOlder := p.config.IgnoreOlder // Overwrite ignore_older for the first scan p.config.IgnoreOlder = 1 defer func() { // Reset ignore_older after first run p.config.IgnoreOlder = ignoreOlder // Disable tail_files after the first run p.config.TailFiles = false }() } p.scan() // It is important that a first scan is run before cleanup to make sure all new states are read first if p.config.CleanInactive > 0 || p.config.CleanRemoved { beforeCount := p.states.Count() cleanedStates, pendingClean := p.states.Cleanup() logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d", beforeCount, beforeCount-cleanedStates, pendingClean) } // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first if p.config.CleanRemoved { for _, state := range p.states.GetStates() { // os.Stat will return an error in case the file does not exist stat, err := os.Stat(state.Source) if err != nil { if os.IsNotExist(err) { p.removeState(state) logp.Debug("input", "Remove state for file as file removed: %s", state.Source) } else { logp.Err("input state for %s was not removed: %s", state.Source, err) } } else { // Check if existing source on disk and state are the same. Remove if not the case. newState := file.NewState(stat, state.Source, p.config.Type, p.meta) if !newState.FileStateOS.IsSame(state.FileStateOS) { p.removeState(state) logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source) } } } } }对Filebeat配置比较熟悉的朋友看到这部分代码,应该很亲切,变量的命名和配置项几乎是对应的,很多判断逻辑都是对配置项的处理,很容易理解。其中比较关键的是p.scan():// Scan starts a scanGlob for each provided path/glob func (p *Input) scan() { var sortInfos []FileSortInfo var files []string // 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现 paths := p.getFiles() // 省略部分代码 for i := 0; i < len(paths); i++ { // 省略一些判断是否采集过,以及采集到哪里的代码 // Decides if previous state exists if lastState.IsEmpty() { logp.Debug("input", "Start harvester for new file: %s", newState.Source) // 启动harvester err := p.startHarvester(newState, 0) // 省略错误处理代码 } else { p.harvestExistingFile(newState, lastState) } } } // harvestExistingFile continues harvesting a file with a known state if needed func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset { logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) // 启动harvester采集 err := p.startHarvester(newState, oldState.Offset) if err != nil { logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } return } // 省略后续代码 } // startHarvester starts a new harvester with the given offset // In case the HarvesterLimit is reached, an error is returned func (p *Input) startHarvester(state file.State, offset int64) error { if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 { p.numHarvesters.Dec() harvesterSkipped.Add(1) return errHarvesterLimit } // Set state to "not" finished to indicate that a harvester is running state.Finished = false state.Offset = offset // Create harvester with state h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() }) if err != nil { p.numHarvesters.Dec() return err } err = h.Setup() if err != nil { p.numHarvesters.Dec() return fmt.Errorf("error setting up harvester: %s", err) } // Update state before staring harvester // This makes sure the states is set to Finished: false // This is synchronous state update as part of the scan h.SendStateUpdate() if err = p.harvesters.Start(h); err != nil { p.numHarvesters.Dec() } return err }scan代码的核心逻辑就是遍历目录下的文件,找到需要采集的文件后就创建启动一个harvester实例进行采集。最后面的startHarvester方法中先是创建了一个harvester实例(p.createHarvester),然后配置(h.Setup)该实例,最后启动(p.harvesters.Start(h))实例。这部分在接下来的Harvester部分介绍。filebeat/log/input.go文件中的代码中包含了大量配置项的代码逻辑,建议好好看一下。如果你对input部分的配置项比较熟悉,这部分代码看起来也比较简单。对照配置项说明文档进行查看,效果更佳。数据采集HarvesterHarvester一个Harvester就是一个goroutine,接口定义在filebeat/harvester/harvester.go中:// Harvester contains all methods which must be supported by each harvester // so the registry can be used by the input type Harvester interface { ID() uuid.UUID Run() error Stop() }不同的input类型会实现自己的Harvester,log类型的Harvester实现在filebeat/input/log/harvester.go中,核心结构定义如下:// Harvester contains all harvester related data type Harvester struct { id uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() } // Run start the harvester and reads files line by line and sends events to the defined output func (h *Harvester) Run() error { // 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送 for { // 读取 message, err := h.reader.Next() // 发送 if !h.sendEvent(data, forwarder) { return nil } } }上篇文章中已经介绍过Harvester,其核心任务就是打开文件,根据配置读取文件内容,并发送。读取和发送都是在Run中实现,见上面的代码注释。本文再补充介绍下另外一个关键点reader.Reader。各种Reader其实读取的真正操作是由一系列Reader完成的。Reader接口定义在filebeat/reader/reader.go中:// Reader is the interface that wraps the basic Next method for // getting a new message. // Next returns the message being read or and error. EOF is returned // if reader will not return any new message on subsequent calls. type Reader interface { Next() (Message, error) } // Message represents a reader event with timestamp, content and actual number // of bytes read from input before decoding. type Message struct { Ts time.Time // timestamp the content was read Content []byte // actual content read Bytes int // total number of bytes read to generate the message Fields common.MapStr // optional fields that can be added by reader }该接口只包含一个Next方法,每调用一次,则读取一个Message。Filebeat实现了很多种Reader,这些Reader根据用户的配置形成一个调用链,对最原始的数据依次进行处理,就像一个流水线一样。每一个后面的Reader都包含了前面的Reader。这些Reader都定义在filebeat/reader目录,主要包括下面这些:最底层的log.go#Read并不是一个Filebeat的reader.Reader实现,而是直接调用了Go底层的Read,实现读取指定长度的字节数据:// filebeat/input/log/log.go func (f *Log) Read(buf []byte) (int, error) { for { n, err := f.fs.Read(buf) // 调用go底层os/file.go#Read } } // os/file.go#Read // Read reads up to len(b) bytes from the File. // It returns the number of bytes read and any error encountered. // At end of file, Read returns 0, io.EOF. func (f *File) Read(b []byte) (n int, err error) { if err := f.checkValid("read"); err != nil { return 0, err } n, e := f.read(b) return n, f.wrapErr("read", e) }再往上都是各种reader.Reader的实现,依次如下(都省略了部分代码):LineReader,实现逐行读取的功能:// filebeat/reader/readfile/line.go // lineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. type LineReader struct { reader io.Reader codec encoding.Encoding bufferSize int maxBytes int nl []byte inBuffer *streambuf.Buffer outBuffer *streambuf.Buffer inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer } // Next reads the next line until the new line character func (r *LineReader) Next() ([]byte, int, error) { for { err := r.advance() } } // Reads from the buffer until a new line character is detected // Returns an error otherwise func (r *LineReader) advance() error { // fill inBuffer until '\n' sequence has been found in input buffer for idx == -1 { // try to read more bytes into buffer filebeat/input/log/log.go#Read n, err := r.reader.Read(buf) } }EncoderReader,对行数据进行编解码:// filebeat/reader/readfile/encode.go // Reader produces lines by reading lines from an io.Reader // through a decoder converting the reader it's encoding to utf-8. type EncoderReader struct { reader *LineReader } // Next reads the next line from it's initial io.Reader // This converts a io.Reader to a reader.reader func (r EncoderReader) Next() (reader.Message, error) { c, sz, err := r.reader.Next() // Creating message object return reader.Message{ Ts: time.Now(), Content: c, Bytes: sz, }, err }JSON Reader,处理JSON格式的数据:// filebeat/reader/json/json.go type JSON struct { reader reader.Reader cfg *Config } // Next decodes JSON and returns the filled Line object. func (r *JSON) Next() (reader.Message, error) { }StripNewline Reader,去掉后面的换行符:// filebeat/reader/readfile/strip_newline.go // StripNewline reader removes the last trailing newline characters from // read lines. type StripNewline struct { reader reader.Reader } // Next returns the next line. func (p *StripNewline) Next() (reader.Message, error) { }Timeout Reader,超时处理:// filebeat/reader/readfile/timeout.go // TimeoutReader will signal some configurable timeout error if no // new line can be returned in time. type TimeoutReader struct { reader reader.Reader timeout time.Duration signal error running bool ch chan lineMessage } // Next returns the next line. If no line was returned before timeout, the // configured timeout error is returned. // For handline timeouts a goroutine is started for reading lines from // configured line reader. Only when underlying reader returns an error, the // goroutine will be finished. func (r *TimeoutReader) Next() (reader.Message, error) { }Multiline Reader,多行合并处理:// filebeat/reader/multiline/multiline.go // MultiLine reader combining multiple line events into one multi-line event. // // Lines to be combined are matched by some configurable predicate using // regular expression. // // The maximum number of bytes and lines to be returned is fully configurable. // Even if limits are reached subsequent lines are matched, until event is // fully finished. // // Errors will force the multiline reader to return the currently active // multiline event first and finally return the actual error on next call to Next. type Reader struct { reader reader.Reader pred matcher flushMatcher *match.Matcher maxBytes int // bytes stored in content maxLines int separator []byte last []byte numLines int truncated int err error // last seen error state func(*Reader) (reader.Message, error) message reader.Message } // Next returns next multi-line event. func (mlr *Reader) Next() (reader.Message, error) { return mlr.state(mlr) }LimitReader,单个event长度限制:// filebeat/reader/readfile/limit.go // Reader sets an upper limited on line length. Lines longer // then the max configured line length will be snapped short. type LimitReader struct { reader reader.Reader maxBytes int } // Next returns the next line. func (r *LimitReader) Next() (reader.Message, error) { message, err := r.reader.Next() if len(message.Content) > r.maxBytes { message.Content = message.Content[:r.maxBytes] message.AddFlagsWithKey("log.flags", "truncated") } return message, err }这么多的Reader并非在所有场景下都是必须的,需要根据用户配置进行装配,这个操作是在初始化Harvester时进行的:// Setup opens the file handler and creates the reader for the harvester func (h *Harvester) Setup() error { err := h.open() if err != nil { return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err) } // 在newLogFileReader中根据用户配置装配 Reader 流 h.reader, err = h.newLogFileReader() if err != nil { if h.source != nil { h.source.Close() } return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err) } return nil }Harvester的Run方法中通过无限循环调用h.reader.Next()时,会依次递归调用这些Reader的Next方法,加工数据,得到的最终数据发给给后端。发送逻辑(Harvester内部)发送的调用流程如下:filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)filebeat/channel/util.go#OnEvent实际的发送是由filebeat/channel/util.go和filebeat/channel/outlet.go中的代码协作完成的:filebeat/channel/util.go:// filebeat/channel/util.go func (o *subOutlet) OnEvent(d *util.Data) bool { o.mutex.Lock() defer o.mutex.Unlock() select { case <-o.done: return false default: } select { case <-o.done: return false // 数据写入channel case o.ch <- d: select { case <-o.done: return true // 等待结果 case ret := <-o.res: return ret } } } // filebeat/channel/util.go // SubOutlet create a sub-outlet, which can be closed individually, without closing the // underlying outlet. func SubOutlet(out Outleter) Outleter { s := &subOutlet{ done: make(chan struct{}), ch: make(chan *util.Data), res: make(chan bool, 1), } go func() { // 从channel读取数据,并调用OnEvent发送数据 for event := range s.ch { s.res <- out.OnEvent(event) } }() return s }filebeat/channel/outlet.go:func (o *outlet) OnEvent(d *util.Data) bool { if !o.isOpen.Load() { return false } event := d.GetEvent() if d.HasState() { event.Private = d.GetState() } if o.wg != nil { o.wg.Add(1) } o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish return o.isOpen.Load() }这部分代码逻辑是:数据经由filebeat/channel/util.go#OnEvent中的case o.ch <- d写入到channel中,然后在内层的select中等待结果;同时filebeat/channel/util.go#Outleter里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent中的o.client.Publish(event)发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。到目前为止,数据都是在Filebeat的代码中流转的,此处调用o.client.Publish(event)之后,就会进入到libbeat中(libbeat/publisher/pipeline/client.go#Publish)。数据发送Publisher从这部分开始,代码进入到libbeat中,这部分代码的复杂主要体现在涉及概念比较多,流转流程比较长。PipelineBeat把Publisher中的整个流程称之为pipeline,是libbeat中非常核心的一部分,代码在libbeat/publisher/pipeline/pipeline.go中,这里贴一下包含较详细注释部分的代码:// Package pipeline combines all publisher functionality (processors, queue, // outputs) to create instances of complete publisher pipelines, beats can // connect to publish events to. package pipeline // Pipeline implementation providint all beats publisher functionality. // The pipeline consists of clients, processors, a central queue, an output // controller and the actual outputs. // The queue implementing the queue.Queue interface is the most central entity // to the pipeline, providing support for pushung, batching and pulling events. // The pipeline adds different ACKing strategies and wait close support on top // of the queue. For handling ACKs, the pipeline keeps track of filtered out events, // to be ACKed to the client in correct order. // The output controller configures a (potentially reloadable) set of load // balanced output clients. Events will be pulled from the queue and pushed to // the output clients using a shared work queue for the active outputs.Group. // Processors in the pipeline are executed in the clients go-routine, before // entering the queue. No filtering/processing will occur on the output side. type Pipeline struct { beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder eventSema *sema processors pipelineProcessors }我整理了一个图:下面介绍pipeline中的各个环节。接前文,Harvester中会持有一个最左边的client实例(一个Input实例中的所有Harvester共享该Client),然后通过这个client调用Producer将数据发送到一个buffer,这是一个channel,大小硬编码为20。同时需要注意的一个点是client的publish中还执行了processor,也就是如果定义了processor,他是在发送的Client里面执行的。这部分功能对应代码如下(只保留了关键代码):// filebeat/channel/outlet.go func (o *outlet) OnEvent(d *util.Data) bool { o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish } // libbeat/publisher/pipeline/client.go func (c *client) publish(e beat.Event) { // 如果定义了processor,则在此处执行 if c.processors != nil { var err error event, err = c.processors.Run(event) publish = event != nil if err != nil { // TODO: introduce dead-letter queue? log.Errorf("Failed to publish event: %v", err) } } var published bool if c.canDrop { published = c.producer.TryPublish(pubEvent) } else { published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go } } // libbeat/publisher/queue/memqueue/produce.go func (p *forgetfulProducer) Publish(event publisher.Event) bool { return p.openState.publish(p.makeRequest(event)) } func (st *openState) publish(req pushRequest) bool { select { // 将数据发送到events buffer中 case st.events <- req: return true case <-st.done: st.events = nil return false } }然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):// libbeat/publisher/queue/memqueue/eventloop.go // 创建EventLoop func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop { l := &bufferingEventLoop{ broker: b, maxEvents: size, minEvents: minEvents, flushTimeout: flushTimeout, // 直接使用Broker的events events: b.events, get: nil, pubCancel: b.pubCancel, acks: b.acks, } l.buf = newBatchBuffer(l.minEvents) l.timer = time.NewTimer(flushTimeout) if !l.timer.Stop() { <-l.timer.C } return l } func (l *bufferingEventLoop) run() { var ( broker = l.broker ) for { select { case <-broker.done: return case req := <-l.events: // producer pushing new event l.handleInsert(&req) } } } func (l *bufferingEventLoop) handleInsert(req *pushRequest) { // insert会把数据写入batchBuffer if l.insert(req) { l.eventCount++ if l.eventCount == l.maxEvents { // 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列 l.events = nil // stop inserting events if upper limit is reached } } }数据到batchBuffer之后,eventConsumer会按照用户配置的规则批量从batchBuffer读取数据,并写入workQueue,这是一个channel。这部分功能对应代码如下(只保留了关键代码):// libbeat/publisher/pipeline/consumer.go func (c *eventConsumer) loop(consumer queue.Consumer) { log := c.logger log.Debug("start pipeline event consumer") var ( out workQueue batch *Batch paused = true ) for { select { case <-c.done: log.Debug("stop pipeline event consumer") return case sig := <-c.sig: handleSignal(sig) // 将数据写入 workQueue case out <- batch: batch = nil } } }数据到workQueue之后,再由netClientWorker模块读取并通过调用output的Publish将数据真正的发送出去。这里以ElasticSearch类型的output为例展示代码流程(只保留了关键代码):// libbeat/publisher/pipeline/output.go func (w *netClientWorker) run() { for !w.closed.Load() { reconnectAttempts := 0 // 从 workQueue读取数据 // send loop for batch := range w.qu { if w.closed.Load() { if batch != nil { batch.Cancelled() } return } // 发送到output err := w.client.Publish(batch) // libbeat/outputs/backoff.go if err != nil { logp.Err("Failed to publish events: %v", err) // on error return to connect loop break } } } } // libbeat/outputs/backoff.go func (b *backoffClient) Publish(batch publisher.Batch) error { err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go if err != nil { b.client.Close() } backoff.WaitOnError(b.backoff, err) return err } // libbeat/outputs/elasticsearch/client.go func (client *Client) Publish(batch publisher.Batch) error { events := batch.Events() rest, err := client.publishEvents(events) if len(rest) == 0 { batch.ACK() // libbeat/publisher/pipeline/batch.go } else { batch.RetryEvents(rest) } return err }至此,整个pipeline的数据流程就算完成了,其实都是各种代码调用,并不复杂,只是需要花时间去看代码而已。接下来,再补充介绍一下pipeline中比较核心的spooler。QueueFilebeat提供了2种队列:内存队列和文件队列。实际中绝大多数应该用的都是内存队列,这里也仅介绍内存队列,文件队列的实现在libbeat/publisher/queue/spool目录下,有兴趣的自行查看,核心的东西和内存队列一致。内存队列的定义在libbeat/publisher/queue/memqueue目录下,定义队列的文件是broker.go:// 内存队列在代码中叫Broker type Broker struct { done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool }前面的介绍的event buffer就是这里的events字段。该文件中还有一个NewBroker函数比较重要,里面是创建一个Broker,并且定义了eventLoop接口,该接口有2个实现:directEventLoop(queue.mem.flush.min_events = 1)bufferingEventLoop(queue.mem.flush.min_events > 1)queue.mem.flush.min_events的默认值为2048,所以创建的是bufferingEventLoop,这里仅介绍该类型的EventLoop:// libbeat/publisher/queue/memqueue/eventloop.go // bufferingEventLoop implements the broker main event loop. // Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout. type bufferingEventLoop struct { broker *Broker buf *batchBuffer flushList flushList eventCount int minEvents int maxEvents int flushTimeout time.Duration // active broker API channels events chan pushRequest get chan getRequest pubCancel chan producerCancelRequest // ack handling acks chan int // ackloop -> eventloop : total number of events ACKed by outputs schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked pendingACKs chanList // ordered list of active batches to be send to the ackloop ackSeq uint // ack batch sequence number to validate ordering // buffer flush timer state timer *time.Timer idleC <-chan time.Time }前面提到的batchBuffer就是这里的buf字段。另外,看代码的时候注意区分一下spooler这个词的含义,大多数时候它都指代的是queue。同时,文件队列也称为spooler。至此,整个pipeline就介绍完了。到这里,从Filebeat实例创建,到数据采集、发送都全部介绍完了。还差的就是ACK流程,这个是和数据发送流程相反的。ACK流程Filebeat基于Registrar+ACK的方式实现了至少发送一次的保证,Registrar前面已经介绍过了,最后看下ACK的流程,这部分相对复杂一些。再看下Registrar的逻辑:// filebeat/registrar/registrar.go type Registrar struct { Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indicates the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int } func (r *Registrar) Run() { logp.Debug("registrar", "Starting Registrar") // Writes registry on shutdown defer func() { r.writeRegistry() r.wg.Done() }() var ( timer *time.Timer flushC <-chan time.Time ) for { select { case <-r.done: logp.Info("Ending Registrar") return case <-flushC: flushC = nil timer.Stop() r.flushRegistry() case states := <-r.Channel: // 依靠这个channel通信 // 收到channel中的确认信息之后,将数据写入registry文件 r.onEvents(states) if r.flushTimeout <= 0 { r.flushRegistry() } else if flushC == nil { timer = time.NewTimer(r.flushTimeout) flushC = timer.C } } } }Registrar结构定义了一个Channel字段,这个channel就是用来接收ack消息的。Run方法里面从这个channel读取数据。然后看下ack数据是如何写到这个channel的。首先filebeat.go初始化时注册了全局ACK处理回调:// filebeat/beater/filebeat.go // Run allows the beater to be run as a beat. func (fb *Filebeat) Run(b *beat.Beat) error { // Make sure all events that were published in registrarChannel := newRegistrarLogger(registrar) // 注册消息成功发送的回调 err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents, }) if err != nil { logp.Err("Failed to install the registry with the publisher pipeline: %v", err) return err } } // filebeat/beater/filebeat.go func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { return &registrarLogger{ done: make(chan struct{}), ch: reg.Channel, } } // filebeat/beater/channels.go type registrarLogger struct { done chan struct{} ch chan<- []file.State } // filebeat/beater/channels.go func (l *registrarLogger) Published(states []file.State) { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal // has been processed the first time. // Note: nil channels will block, so only done channel will be actively // report 'closed'. l.ch = nil case l.ch <- states: } } // filebeat/beater/acker.go type statefulLogger interface { Published(states []file.State) } // filebeat/beater/acker.go func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer { return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")} } // 注册的回调函数 func (a *eventACKer) ackEvents(data []interface{}) { stateless := 0 states := make([]file.State, 0, len(data)) for _, datum := range data { if datum == nil { stateless++ continue } st, ok := datum.(file.State) if !ok { stateless++ continue } states = append(states, st) } if len(states) > 0 { a.log.Debugw("stateful ack", "count", len(states)) a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State) } if stateless > 0 { a.log.Debugw("stateless ack", "count", stateless) a.stateless.Published(stateless) // } }这里最终注册的回调是eventACKer,里面的重点是a.stateful.Published,看下这个的实现:// filebeat/beater/channels.go func (l *registrarLogger) Published(states []file.State) { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal // has been processed the first time. // Note: nil channels will block, so only done channel will be actively // report 'closed'. l.ch = nil case l.ch <- states: } }里面将最终的ack消息(states)写到了l.ch。这个channel就是Registrar那里的channel(从注册的代码里面可以分析出来),即回调函数将ack消息写入channel,然后Registrar从channel中读取states数据,写入registry文件,这样形成一个闭环。如下图:现在的问题就是:这个ackEvents回调函数的ack又是哪来的呢?是谁(who),在什么地方(where),什么时候(when),以何种方式(how)发送到ackEvents的?首先推断一下,既然是ack,那最源头当然应该是最终发送数据的地方发出,即发送数据完成得到外部确认之后,反向传递ack,正好和采集的时间传递方向相反,也就是核心应该在Publisher里面,或者说libbeat的pipeline里面。下面从pipeline中最核心的(内存)队列Broker模块开始分析。// libbeat/publisher/queue/memqueue/broker.go // NewBroker creates a new broker based in-memory queue holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewBroker( logger logger, settings Settings, ) *Broker { var eventLoop interface { run() processACK(chanList, int) } // 创建EventLoop if minEvents > 1 { eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout) } else { eventLoop = newDirectEventLoop(b, sz) } b.bufSize = sz // 创建AckLoop ack := newACKLoop(b, eventLoop.processACK) b.wg.Add(2) go func() { defer b.wg.Done() eventLoop.run() }() // 这个goroutine中启动 ack go func() { defer b.wg.Done() ack.run() }() } // libbeat/publisher/queue/memqueue/ackloop.go func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop { l := &ackLoop{broker: b} l.processACK = processACK return l }NewBroker中创建了eventLoop和ackLoop,前者用于发送数据,前面已经讲过,后者则用于处理ack。看下ackLoop的代码:// libbeat/publisher/queue/memqueue/ackloop.go // ackLoop implements the brokers asynchronous ACK worker. // Multiple concurrent ACKs from consecutive published batches will be batched up by the // worker, to reduce the number of signals to return to the producer and the // broker event loop. // Producer ACKs are run in the ackLoop go-routine. type ackLoop struct { broker *Broker sig chan batchAckMsg // 确认消息发送的channel lst chanList totalACK uint64 totalSched uint64 batchesSched uint64 batchesACKed uint64 processACK func(chanList, int) } func (l *ackLoop) run() { for { select { case <-l.broker.done: return case acks <- acked: acks, acked = nil, 0 case lst := <-l.broker.scheduledACKs: count, events := lst.count() l.lst.concat(&lst) l.batchesSched += uint64(count) l.totalSched += uint64(events) // 这里等待batch发送完成的确认信号 case <-l.sig: acked += l.handleBatchSig() if acked > 0 { acks = l.broker.acks } } } }可以看到,ackloop中在等待batch发送完成的信号(sig),这里有2条线:信号如何来的?收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?先来看第1个问题:信号如何来的?根据前面的推断,应该由发送数据那里产生。而由pipeline部分的分析知道最终数据发送在output那里。此处继续以ES这种类型的output为例,看下最终发送的代码,从那里反推:// libbeat/outputs/elasticsearch/client.go 这是之前ES部分最终的发送代码 func (client *Client) Publish(batch publisher.Batch) error { events := batch.Events() rest, err := client.publishEvents(events) if len(rest) == 0 { batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go } else { batch.RetryEvents(rest) } return err } // libbeat/publisher/pipeline/batch.go func (b *Batch) ACK() { b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {} 这个是监控用的 b.original.ACK() // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go releaseBatch(b) } // libbeat/publisher/queue/memqueue/consume.go func (b *batch) ACK() { if b.state != batchActive { switch b.state { case batchACK: panic("Can not acknowledge already acknowledged batch") default: panic("inactive batch") } } b.report() // 重点在这里 } // libbeat/publisher/queue/memqueue/consume.go func (b *batch) report() { b.ack.ch <- batchAckMsg{} // 最终在这里发送了确认ACK }Bingo!在Publish里面发送完成(可能失败,可能成功)之后,就会发送ACK。然后根据调用关系往回推,最终在report中发送了ack,第1个问题就解答了,ack信号就是这样来的。然后看第2个问题:收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?接着看收到信号后调用的handleBatchSig的代码:// libbeat/publisher/queue/memqueue/ackloop.go // handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig // is run by the ackLoop. func (l *ackLoop) handleBatchSig() int { if count > 0 { if e := l.broker.eventer; e != nil { e.OnACK(count) } // 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK // report acks to waiting clients l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK } } // libbeat/publisher/queue/memqueue/eventloop.go func (l *bufferingEventLoop) processACK(lst chanList, N int) { for !lst.empty() { // 重点在这里,这里会调用ackEvents发送确认消息 st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int) } } }handleBatchSig里面调用了对应类型的eventLoop的processACK方法,该方法内部会转到pipeline的acker.go,下面给出代码流转:libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}看下最后一个ackEvents:func (a *gapCountACK) ackEvents(n int) { select { case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event a.acks = nil // ack数n写入了a.acks case a.acks <- n: // send ack event to worker } } // gapCountACK returns event ACKs to the producer, taking account for dropped events. // Events being dropped by processors will always be ACKed with the last batch ACKed // by the broker. This way clients waiting for ACKs can expect all processed // events being always ACKed. type gapCountACK struct { pipeline *Pipeline fn func(total int, acked int) done chan struct{} drop chan struct{} acks chan int events atomic.Uint32 lst gapList }数据会写入gapCountACK的acks之后,会在ackLoop中读取:// libbeat/publisher/pipeline/acker.go func (a *gapCountACK) ackLoop() { acks, drop := a.acks, a.drop closing := false for { select { case <-a.done: closing = true a.done = nil if a.events.Load() == 0 { // stop worker, if all events accounted for have been ACKed. // If new events are added after this acker won't handle them, which may // result in duplicates return } case <-a.pipeline.ackDone: return case n := <-acks: // 重点:从acks读出n之后调用handleACK处理 empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer a.events.Sub(1) a.fn(1, 0) } } } func (a *gapCountACK) handleACK(n int) bool { a.events.Sub(uint32(total)) a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) { return emptyLst }从acks读出n之后调用handleACK处理,后续的调用流程如下:1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked) 2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked) 3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked) 4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)最终进入到pipelineEventCB中,这个结构是内部处理ack的(我理解pipelineEventCB命名的含义是pipeline中event的callback函数,可见它是处理ack的核心),看下关键代码:// libbeat/publisher/pipeline/pipeline_ack.go // pipelineEventCB internally handles active ACKs in the pipeline. // It receives ACK events from the queue and the individual clients. // Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect // events from all clients having published events in the last batch of events // being ACKed. // the PipelineACKHandler will be notified, once all events being ACKed // (including dropped events) have been collected. Only one ACK-event is handled // at a time. The pipeline global and clients ACK handler will be blocked for the time // an ACK event is being processed. type pipelineEventCB struct { done chan struct{} acks chan int // 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel events chan eventsDataMsg droppedEvents chan eventsDataMsg mode pipelineACKMode handler beat.PipelineACKHandler }其中的events字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel。接着之前的调用,看数据如何写到events这个channel:// reportEvents sends a batch of ACKed events to the ACKer. // The events array contains send and dropped events. The `acked` counters // indicates the total number of events acked by the pipeline. // That is, the number of dropped events is given by `len(events) - acked`. // A client can report events with acked=0, iff the client has no waiting events // in the pipeline (ACK ordering requirements) // // Note: the call blocks, until the ACK handler has collected all active events // from all clients. This ensure an ACK event being fully 'captured' // by the pipeline, before receiving/processing another ACK event. // In the meantime the queue has the chance of batching-up more ACK events, // such that only one ACK event is being reported to the pipeline handler func (p *pipelineEventCB) onEvents(data []interface{}, acked int) { p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked}) } func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) { if msg.acked == 0 { p.droppedEvents <- msg } else { msg.sig = make(chan struct{}) p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData <-msg.sig } }在pushMsg中,将消息写入events中。然后看下channel另一端的读取:func (p *pipelineEventCB) worker() { defer close(p.acks) defer close(p.events) defer close(p.droppedEvents) for { select { case count := <-p.acks: // 在collect中读取消息 exit := p.collect(count) if exit { return } // short circuit dropped events, but have client block until all events // have been processed by pipeline ack handler case msg := <-p.droppedEvents: p.reportEventsData(msg.data, msg.total) if msg.sig != nil { close(msg.sig) } case <-p.done: return } } } func (p *pipelineEventCB) collect(count int) (exit bool) { for acked < count { var msg eventsDataMsg select { // 在此处读取消息 case msg = <-p.events: case msg = <-p.droppedEvents: case <-p.done: exit = true return } } p.reportEventsData(data, total) return } func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) { // report ACK back to the beat switch p.mode { case countACKMode: p.handler.ACKCount(total) case eventsACKMode: // 这里调用之前在 Filebeat中注册的 ACKEvents p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{}) case lastEventsACKMode: p.handler.ACKLastEvents(data) } }可以看到从channel中读出确认消息后,最终会在reportEventsData中调用之前在Filebeat中注册的ACKEvents:eventACKer。至此,第2个问题也得到了解答,并且和之前的回调函数成功对接。把之前的图补充完整如下:总结本文从源码角度对Filebeat的核心数据流转进行了简单的分析,为了突出重点,省掉了很多细节代码,比如各个环节的数据结构的实例是何时创建的,又是何时启动的,以及一些异常分支的处理、失败重传等。正如前文所说,本文并不是一篇全面的Filebeat代码深度剖析,而是通用配置下的核心数据流转分析。这篇文章我改了很多遍,总感觉没写什么东西,只是贴了了大量代码,然后讲了代码的调用点,而这些其实自己debug几遍也就清楚了。不过话说回来,如果事先知道整个逻辑流程,以及关键调用点,就能在debug时做到胸有成竹、有所侧重,从而节省很多时间。我想这篇文章对我最大的意义有两点:一个是让自己对Filebeat更加了解,另外一个是可能会给其它看源码的人节省一些时间。这就够了。Reference:Beats Developer Guidefilebeat源码解析