Flink的1.12.0版本前段时间发布了,又带来了很多新特性,其中有两个跟容器化相关的特性:
- Native Kubernetes部署方式由之前的实验性(experimental)变为正式特性,也就是我们可以在生产环境里面放心大胆的使用了;
- Kubernetes上面Flink的高可用除了ZooKeeper外又多了一种更轻量级的,更Native的基于ConfigMap的方案选择。
当然,这些特性目前在有些细小方面还是存在一些不足(不过瑕不掩瑜),下面的测试中会有所说明。
准备镜像
Flink 1.12已经有一段时间了,但官方的镜像到现在也还没有推上去,如果你用的是最新的1.12.0版本,看官方文档的时候一定要注意,文档里面都没有指定镜像名或者tag,这样拉取到的其实还是1.11.x版本的镜像,然后打出来的镜像在1.12的flink上面运行大概率是会报错的。
所以本次测试是我自己打的镜像。
flink 1.12.0镜像
git clone https://github.com/apache/flink-docker
# 此处可以选择你自己的Scala和JDK版本
cd 1.12/scala_2.11-java8-debian
docker build --tag flink:1.12.0-scala_2.11 .
Tips:Flink默认的镜像命名规则为:flink:<FLINK_VERSION>-scala_<SCALA_VERSION>"
,当然你也可以自定义,然后在启动的时候通过kubernetes.container.image
参数指定镜像名称。
flink hadoop镜像
后面测试高可用的时候,我使用了HDFS,默认的镜像里面是不包含HDFS的依赖的,所以需要自己加进去(自行从maven下载)。如果你用的是oss、s3、swift之类的,就不需要了,默认已经包含了,只需要启动的时候配置一下即可。下面是Dockerfile:
FROM flink:1.12.0-scala_2.11
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib/
执行docker build --tag flink-haddop:1.12.0-scala_2.11 .
。
TopSpeedWindowing镜像
下面还会演示Flink Application Cluster,这种模式下需要把应用打到flink镜像里面,方法也非常简单,把应用的jar拷贝到$FLINK_HOME/usrlib
即可,如果你需要定制配置文件的话,也可以加到这里。下面以flink自带的TopSpeedWindowing为例,该应用的jar在flink安装包的examples/streaming
目录下。下面是Dockerfile:
FROM flink-haddop:1.12.0-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
执行docker build --tag top-speed-windowing:1.12.0 .
。
这样本文用到的所有镜像就准备完成了。
Flink Native Kubernetes
先下载并解压Flink 1.12.0版本(过程略),下面的命令都是在flink解压后的目录执行的。
Flink Session Cluster
前文介绍过了,Flink Session Cluster就是先部署一个集群,然后往集群上面提交任务。所以我们先创建一个集群:
./bin/kubernetes-session.sh \
-Dkubernetes.container.image=flink:1.12.0-scala_2.11 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dkubernetes.cluster-id=flink-session-cluster
# 下面是输出:
2020-12-21 22:55:35,527 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-12-21 22:55:35,537 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-12-21 22:55:35,538 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-21 22:55:35,538 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2020-12-21 22:55:35,540 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-12-21 22:55:35,773 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-12-21 22:55:38,128 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,148 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:38,163 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2020-12-21 22:55:38,163 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2020-12-21 22:55:38,259 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-21 22:55:44,134 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster flink-session-cluster successfully, JobManager Web Interface: http://10.9.1.18:38566
这样一个集群就部署好了,Native Kubernetes的部署方式使用了Kubernetes的资源管理和分配能力,所以此时集群只有Jobmanager。TaskManager会在后面有任务时动态创建出来。输出日志的最后一行打印了Web UI地址,可以检查集群状态。
$ kubectl get pod,svc,cm,deploy
NAME READY STATUS RESTARTS AGE
pod/flink-session-cluster-665766d9d5-8jxmb 1/1 Running 0 13s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-session-cluster ClusterIP None <none> 6123/TCP,6124/TCP 12s
service/flink-session-cluster-rest NodePort 10.68.245.4 <none> 8081:38566/TCP 12s
service/kubernetes ClusterIP 10.68.0.1 <none> 443/TCP 38d
NAME DATA AGE
configmap/flink-config-flink-session-cluster 3 12s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-session-cluster 1/1 1 1 13s
集群部署好了,提交一个任务:
./bin/flink run -p 4 \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-session-cluster \
./examples/streaming/TopSpeedWindowing.jar
# 查看一下动态创建的TaskManager
kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-session-cluster-665766d9d5-8jxmb 1/1 Running 0 2m17s
flink-session-cluster-taskmanager-1-1 1/1 Running 0 33s
flink-session-cluster-taskmanager-1-2 1/1 Running 0 33s
然后在web上面删除任务,可以看到集群依然是存在的,因为Session Cluster的生命周期是独立于Job的。但TaskManager在空闲一段时间(resourcemanager.taskmanager-timeout
,默认30秒)后会被回收。
最后删除测试集群:
kubectl delete deploy/flink-session-cluster
Flink Application Cluster
前文介绍过,Flink Application Cluster就是应用创建自己专属的集群,一个应用可以包含多个Job,集群生命周期和Job同步。
# 创建集群&&启动应用
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-application-cluster \
-Dkubernetes.container.image=top-speed-windowing:1.12.0 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
# 查看
kubectl get pod,svc,cm,deploy
NAME READY STATUS RESTARTS AGE
pod/flink-application-cluster-7fc5ccd899-9p7ns 1/1 Running 0 115s
pod/flink-application-cluster-taskmanager-1-1 1/1 Running 0 54s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 114s
service/flink-application-cluster-rest NodePort 10.68.147.132 <none> 8081:39463/TCP 113s
service/kubernetes ClusterIP 10.68.0.1 <none> 443/TCP 38d
NAME DATA AGE
configmap/flink-config-flink-application-cluster 3 113s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-application-cluster 1/1 1 1 115s
和之前的Session Cluster相比,集群创建和任务提交集成在了一起,一条命令搞定了所有的事情,非常的方便。此时,如果你在Web上取消任务,你会发现整个集群都没了,符合Job Cluster生命周期与Job同步的说法。
Flink Job Cluster
目前不支持。Flink Job Cluster其实比较鸡肋,介于Session Cluster和Application Cluster之间,一般根据需要选则后面这两个即可。
基于Kubernetes的高可用
Flink的高可用主要解决JobManager的单点故障问题,之前只有一种基于Zookeeper的方案,1.12.0版本中增加了一个基于Kubernetes ConfigMap的方案(仅用于使用Kubernetes部署Flink的场景),该特性对应有一个FLIP-144: Native Kubernetes HA for Flink,对设计细节有兴趣的可以看下。这个原生的高可用方案使用也非常简单,在上的基础上再增加两个配置项即可:
# HA服务,zk的时候是zookeeper,Kubernetes的时候填下面这个类
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
# 虽然不再依赖zk了,但仍然需要高可用存储
high-availability.storageDir: hdfs:///flink/recovery
这里我使用的高可用存储是HDFS,下面分别创建两个集群:
# Flink Session Cluster
./bin/kubernetes-session.sh \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
-Dkubernetes.cluster-id=flink-session-cluster
# Flink Application Cluster
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.container.image=flink-haddop:1.12.0-scala_2.11 \
-Dkubernetes.cluster-id=flink-application-cluster \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://<namenode-ip>:<port>/flink/flink-ha \
-Dkubernetes.container.image=top-speed-windowing:1.12.0 \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
Native的HA方案真的是非常naive,使用非常简单,没什么需要讲的。
最后解释一些关于HA的问题。
1, 为什么上述命令启动的JobManager Pod只有1个副本?答:除了用上面Flink自己的kubernetes-session.sh
或者flink
命令这种便捷方式容器化外,也有很多人自己写部署文件或者使用官方提供的部署文件(特别是Native Kubernetes还不成熟的时候),有些人部署HA的时候将JobManager的副本数设置为2,甚至更多。其实在Kubernetes上面,这是没有必要的。如前所述,Flink提供的HA是主JobManager挂掉后,从快速接替主的职责,从共享存储上面恢复已经提交运行的任务。对于Kubernetes,如果JobManager的Pod挂掉了,马上会有一个新的JobManager Pod被创建出来。也就是这个standby的JobManager不是事先就创建好的,而是在需要的时候动态产生的。这样也不用平时空跑一个standby的JobManager浪费资源(注意:standby的JobManager是纯备,不对外提供任务服务,也没有什么负载均衡的功能)。
2, Flink的HA模式下,如果主JobManager挂掉了,任务会重启,这还算HA吗?会丢数据吗?答:这个问题也同时适用于基于Zookeeper的HA。Flink的HA模式依赖两部分:一部分是ZK或者Kubernetes的ConfigMap,这部分主要职责是负责选举主JobManager、服务发现、少量元数据存储(比如任务的运行状态、任务相关的二进制文件在共享存储上的存储路径等);另外一部分是共享存储(比如上面的HDFS),该部分的主要职责是存储任务相关的二进制文件,比如Job Graph及其依赖等。有了这两部分,当主JobManager挂掉后,就会有新的JobManager产生,它可以依据这些信息重新恢复之前运行的任务。注意,这里是恢复。也就是主节点故障后,任务会故障,但HA模式下,Flink会保证任务快速被恢复。那这种机制算HA吗?当然要看下HA的定义:
高可用(High Availability,HA)是分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计减少系统不能提供服务的时间。假设系统一直能够提供服务,我们说系统的可用性是100%。如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。在线系统和执行关键任务的系统通常要求其可用性要达到5个9标准(99.999%,年故障时间为5分15秒)。
按此定义来说,上述机制当然算HA,只是它提供的不是100%的可用性而已。另外,丢不丢数据这个其实跟HA关系不大,解决丢数据的问题是分布式中容错(Fault Tolerance)的职责,Flink提供了Checkpoint和Savepoint两种机制。也就是如果你开启了Checkpoint,那有没有HA都可以保证不丢数据;反之,没开启的话,有没有HA都可能会丢数据。
总结
可以看到,这两个新特性让Flink在Kubernetes上的使用变得简单了很多。当然,试用了一下,我个人觉得后面还是有一些工作需要完善,比如现在通过kubernetes-session.sh
或者flink
创建集群时,虽然已经支持很多配置项(见这里)来满足一些自定义,但还是不够。比如我刚开始想用nfs来测试高可用,但发现通过上面的方式创建集群的话,没有方式可以挂载PVC,像HDFS、OSS、S3这些不需要挂载就可以直接使用,但NFS不行,所以后面就又部署了一个HDFS进行测试。不过目前社区已经在改进这些不足了,见FLINK-15649: Support mounting volumes和FLINK-15656: Support user-specified pod templates. 这部分我单独写了一篇文章Flink Native Kubernetes支持Volume Mount,有兴趣的可以查看。
参考:
我在mac机器上安装好了minikube,并按照上述步骤成功构建了flink 1.12版本的docker image,使用docker images命令能够列出新构建的镜像,
REPOSITORY TAG IMAGE ID CREATED SIZE
flink 1.12.0-scala_2.12-java8 f7dd9b9e020b 3 hours ago 642MB
然而在执行命令 docker pull flink:1.12.0-scala_2.12-java8 时报如下错误:
Error response from daemon: manifest for flink:1.12.0-scala_2.12-java8 not found: manifest unknown: manifest unknown
导致后面用kubernetes session部署flink作业失败。请问该错误应该如何解决?谢谢!
docker build构建的镜像是在你本地的(就跟你docker pull下来的镜像一样)。你并没有把他docker push到某个仓库里面去,所以用docker pull是拉不下来的(因为远程仓库就是没有这个镜像),也就会报你说的错误。现在是这样:
只是在本地测试,我生成的镜像名称叫flink:1.12.0-scala_2.12-java8,启动flink集群的命令是
./bin/kubernetes-session.sh \
结果显示 Create flink session cluster flink-session-cluster successfully, JobManager Web Interface: http://192.168.99.100:32247
但用kubectl get pods命令查看显示pod一直处理ContainerCreating状态长达数分钟。再通过kubectl describe pod xxx查看,显示有两处error:
MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-session-cluster" not found
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap "hadoop-config-flink-session-cluster" not found
Failed to pull image "flink:1.12.0-scala_2.12-java8": rpc error: code = Unknown desc = Error response from daemon: manifest for flink:1.12.0-scala_2.12-java8 not found: manifest unknown: manifest unknown
Error: ImagePullBackOff
Error: ErrImagePull
然而通过docker images命令查看该镜像的确是存在的
REPOSITORY TAG IMAGE ID CREATED SIZE
flink 1.12.0-scala_2.12-java8 f7dd9b9e020b 12 hours ago 642MB
还请看看是哪里出了问题?谢谢!
我大概知道问题了,之前没注意看,你用的是minikube。你用
docker images
命令是在你的mac上执行的,而不是在minikube创建的虚机(即k8s的node)里面执行的对吧?如果是这样的话,那也是不行的,pod是运行在node上的,所以它是检查node上面有没有这个镜像,而不是你的mac主机。如果是这个问题,