Flink的1.12.0版本前段时间发布了,又带来了很多新特性,其中有两个跟容器化相关的特性:

  • Native Kubernetes部署方式由之前的实验性(experimental)变为正式特性,也就是我们可以在生产环境里面放心大胆的使用了;
  • Kubernetes上面Flink的高可用除了ZooKeeper外又多了一种更轻量级的,更Native的基于ConfigMap的方案选择。

当然,这些特性目前在有些细小方面还是存在一些不足(不过瑕不掩瑜),下面的测试中会有所说明。

准备镜像

Flink 1.12已经有一段时间了,但官方的镜像到现在也还没有推上去,如果你用的是最新的1.12.0版本,看官方文档的时候一定要注意,文档里面都没有指定镜像名或者tag,这样拉取到的其实还是1.11.x版本的镜像,然后打出来的镜像在1.12的flink上面运行大概率是会报错的。

所以本次测试是我自己打的镜像。

flink 1.12.0镜像

git clone https://github.com/apache/flink-docker
# 此处可以选择你自己的Scala和JDK版本
cd 1.12/scala_2.11-java8-debian 
docker build  --tag flink:1.12.0-scala_2.11  .

Tips:Flink默认的镜像命名规则为:flink:<FLINK_VERSION>-scala_<SCALA_VERSION>",当然你也可以自定义,然后在启动的时候通过kubernetes.container.image参数指定镜像名称。

flink hadoop镜像

后面测试高可用的时候,我使用了HDFS,默认的镜像里面是不包含HDFS的依赖的,所以需要自己加进去(自行从maven下载)。如果你用的是oss、s3、swift之类的,就不需要了,默认已经包含了,只需要启动的时候配置一下即可。下面是Dockerfile:

FROM flink:1.12.0-scala_2.11
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib/

执行docker build --tag flink-haddop:1.12.0-scala_2.11 .

TopSpeedWindowing镜像

下面还会演示Flink Application Cluster,这种模式下需要把应用打到flink镜像里面,方法也非常简单,把应用的jar拷贝到$FLINK_HOME/usrlib即可,如果你需要定制配置文件的话,也可以加到这里。下面以flink自带的TopSpeedWindowing为例,该应用的jar在flink安装包的examples/streaming目录下。下面是Dockerfile:

FROM flink-haddop:1.12.0-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar

执行docker build --tag top-speed-windowing:1.12.0 .

这样本文用到的所有镜像就准备完成了。

Flink Native Kubernetes

先下载并解压Flink 1.12.0版本(过程略),下面的命令都是在flink解压后的目录执行的。

Flink Session Cluster

前文介绍过了,Flink Session Cluster就是先部署一个集群,然后往集群上面提交任务。所以我们先创建一个集群:

./bin/kubernetes-session.sh \
    -Dkubernetes.container.image=flink:1.12.0-scala_2.11 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dtaskmanager.numberOfTaskSlots=2 \
    -Dkubernetes.cluster-id=flink-session-cluster


# 下面是输出:
2020-12-21 22:55:35,527 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-12-21 22:55:35,537 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-12-21 22:55:35,538 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-21 22:55:35,538 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2020-12-21 22:55:35,540 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-12-21 22:55:35,773 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-12-21 22:55:38,128 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,148 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,163 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2020-12-21 22:55:38,163 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2020-12-21 22:55:38,259 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:44,134 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster flink-session-cluster successfully, JobManager Web Interface: http://10.9.1.18:38566

这样一个集群就部署好了,Native Kubernetes的部署方式使用了Kubernetes的资源管理和分配能力,所以此时集群只有Jobmanager。TaskManager会在后面有任务时动态创建出来。输出日志的最后一行打印了Web UI地址,可以检查集群状态。

$ kubectl get pod,svc,cm,deploy
NAME                                         READY   STATUS    RESTARTS   AGE
pod/flink-session-cluster-665766d9d5-8jxmb   1/1     Running   0          13s

NAME                                 TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
service/flink-session-cluster        ClusterIP   None          <none>        6123/TCP,6124/TCP   12s
service/flink-session-cluster-rest   NodePort    10.68.245.4   <none>        8081:38566/TCP      12s
service/kubernetes                   ClusterIP   10.68.0.1     <none>        443/TCP             38d

NAME                                           DATA   AGE
configmap/flink-config-flink-session-cluster   3      12s

NAME                                    READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-session-cluster   1/1     1            1           13s

集群部署好了,提交一个任务:

./bin/flink run -p 4 \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=flink-session-cluster \
    ./examples/streaming/TopSpeedWindowing.jar 

# 查看一下动态创建的TaskManager
kubectl get pod
NAME                                     READY   STATUS    RESTARTS   AGE
flink-session-cluster-665766d9d5-8jxmb   1/1     Running   0          2m17s
flink-session-cluster-taskmanager-1-1    1/1     Running   0          33s
flink-session-cluster-taskmanager-1-2    1/1     Running   0          33s

然后在web上面删除任务,可以看到集群依然是存在的,因为Session Cluster的生命周期是独立于Job的。但TaskManager在空闲一段时间(resourcemanager.taskmanager-timeout,默认30秒)后会被回收。

最后删除测试集群:

kubectl delete deploy/flink-session-cluster

Flink Application Cluster

前文介绍过,Flink Application Cluster就是应用创建自己专属的集群,一个应用可以包含多个Job,集群生命周期和Job同步。

# 创建集群&&启动应用
./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

# 查看
kubectl get pod,svc,cm,deploy
NAME                                             READY   STATUS    RESTARTS   AGE
pod/flink-application-cluster-7fc5ccd899-9p7ns   1/1     Running   0          115s
pod/flink-application-cluster-taskmanager-1-1    1/1     Running   0          54s

NAME                                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/flink-application-cluster        ClusterIP   None            <none>        6123/TCP,6124/TCP   114s
service/flink-application-cluster-rest   NodePort    10.68.147.132   <none>        8081:39463/TCP      113s
service/kubernetes                       ClusterIP   10.68.0.1       <none>        443/TCP             38d

NAME                                               DATA   AGE
configmap/flink-config-flink-application-cluster   3      113s

NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-application-cluster   1/1     1            1           115s

和之前的Session Cluster相比,集群创建和任务提交集成在了一起,一条命令搞定了所有的事情,非常的方便。此时,如果你在Web上取消任务,你会发现整个集群都没了,符合Job Cluster生命周期与Job同步的说法。

Flink Job Cluster

目前不支持。Flink Job Cluster其实比较鸡肋,介于Session Cluster和Application Cluster之间,一般根据需要选则后面这两个即可。

基于Kubernetes的高可用

Flink的高可用主要解决JobManager的单点故障问题,之前只有一种基于Zookeeper的方案,1.12.0版本中增加了一个基于Kubernetes ConfigMap的方案(仅用于使用Kubernetes部署Flink的场景),该特性对应有一个FLIP-144: Native Kubernetes HA for Flink,对设计细节有兴趣的可以看下。这个原生的高可用方案使用也非常简单,在上的基础上再增加两个配置项即可:

# HA服务,zk的时候是zookeeper,Kubernetes的时候填下面这个类
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
# 虽然不再依赖zk了,但仍然需要高可用存储
high-availability.storageDir: hdfs:///flink/recovery

这里我使用的高可用存储是HDFS,下面分别创建两个集群:

# Flink Session Cluster
./bin/kubernetes-session.sh \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
    -Dkubernetes.cluster-id=flink-session-cluster

# Flink Application Cluster
./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
    -Dkubernetes.cluster-id=flink-application-cluster \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
    -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

Native的HA方案真的是非常naive,使用非常简单,没什么需要讲的。

最后解释一些关于HA的问题。

1, 为什么上述命令启动的JobManager Pod只有1个副本?答:除了用上面Flink自己的kubernetes-session.sh或者flink命令这种便捷方式容器化外,也有很多人自己写部署文件或者使用官方提供的部署文件(特别是Native Kubernetes还不成熟的时候),有些人部署HA的时候将JobManager的副本数设置为2,甚至更多。其实在Kubernetes上面,这是没有必要的。如前所述,Flink提供的HA是主JobManager挂掉后,从快速接替主的职责,从共享存储上面恢复已经提交运行的任务。对于Kubernetes,如果JobManager的Pod挂掉了,马上会有一个新的JobManager Pod被创建出来。也就是这个standby的JobManager不是事先就创建好的,而是在需要的时候动态产生的。这样也不用平时空跑一个standby的JobManager浪费资源(注意:standby的JobManager是纯备,不对外提供任务服务,也没有什么负载均衡的功能)。

2, Flink的HA模式下,如果主JobManager挂掉了,任务会重启,这还算HA吗?会丢数据吗?答:这个问题也同时适用于基于Zookeeper的HA。Flink的HA模式依赖两部分:一部分是ZK或者Kubernetes的ConfigMap,这部分主要职责是负责选举主JobManager、服务发现、少量元数据存储(比如任务的运行状态、任务相关的二进制文件在共享存储上的存储路径等);另外一部分是共享存储(比如上面的HDFS),该部分的主要职责是存储任务相关的二进制文件,比如Job Graph及其依赖等。有了这两部分,当主JobManager挂掉后,就会有新的JobManager产生,它可以依据这些信息重新恢复之前运行的任务。注意,这里是恢复。也就是主节点故障后,任务会故障,但HA模式下,Flink会保证任务快速被恢复。那这种机制算HA吗?当然要看下HA的定义:

高可用(High Availability,HA)是分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计减少系统不能提供服务的时间。假设系统一直能够提供服务,我们说系统的可用性是100%。如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。在线系统和执行关键任务的系统通常要求其可用性要达到5个9标准(99.999%,年故障时间为5分15秒)。

按此定义来说,上述机制当然算HA,只是它提供的不是100%的可用性而已。另外,丢不丢数据这个其实跟HA关系不大,解决丢数据的问题是分布式中容错(Fault Tolerance)的职责,Flink提供了Checkpoint和Savepoint两种机制。也就是如果你开启了Checkpoint,那有没有HA都可以保证不丢数据;反之,没开启的话,有没有HA都可能会丢数据。

总结

可以看到,这两个新特性让Flink在Kubernetes上的使用变得简单了很多。当然,试用了一下,我个人觉得后面还是有一些工作需要完善,比如现在通过kubernetes-session.sh或者flink创建集群时,虽然已经支持很多配置项(见这里)来满足一些自定义,但还是不够。比如我刚开始想用nfs来测试高可用,但发现通过上面的方式创建集群的话,没有方式可以挂载PVC,像HDFS、OSS、S3这些不需要挂载就可以直接使用,但NFS不行,所以后面就又部署了一个HDFS进行测试。不过目前社区已经在改进这些不足了,见FLINK-15649: Support mounting volumesFLINK-15656: Support user-specified pod templates. 这部分我单独写了一篇文章Flink Native Kubernetes支持Volume Mount,有兴趣的可以查看。

参考: