在之前的文章 Flink快速了解(4)——NativeKubernetes&HA 中讲到 Native Kubernetes在Flink 1.12版本中已经成为一个正式特性,使用起来也的确非常的简单、方便,但文末提到我碰到的一个问题:无法挂载volume。其实目前Flink Native Kubernetes这种方式提供的容器自定义能力还非常有限。从代码看,是通过一个个配置去支持的(见KubernetesConfigOptions.java),但k8s的Pod定义选项太多了,通过这种方式去支持,会一直疲于奔命,而且还要不断的和k8s版本关联。所以,目前社区有一个JIRA FLINK-15656: Support user-specified pod templates,计划直接支持用户自定义pod template,但目前好像还没有明确的版本计划。另外,考虑到pod挂载volume是一个更加普遍化的高需求,所以还有一个单独的JIRA FLINK-15649: Support mounting volumes,不过目前也没有明确的版本计划。
我看了一下这个JIRA,其实已经有人提了PR(#14283)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的(点此下载,密码: hi52
。怎么现在分享还必须设置密码了...)。
使用说明
这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:
// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
key("kubernetes.jobmanager.volumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
key("kubernetes.taskmanager.vlumemount")
.stringType()
.noDefaultValue()
.withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
"Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
"parameters - name used for mount, mounting path and volume specific parameters");
也可以从单元测试文件看使用方法:
// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
+ VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
+ VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}
emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:
- 方式1:不使用subpath,共5个参数。示例:
-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>
。最后一个false或者true表示是否以只读方式挂载。 - 方式2:使用subpath,共6个参数。示例:
-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>
。
下面利用这个PR实现基于NFS的Flink Kubernetes HA。
- 先用修改过的
flink-dist_2.11-1.12.0.jar
替换官方包里面lib
目录下的flink-dist_2.11-1.12.0.jar
(懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。 - 准备好一个pvc,这里我使用的是nfs storage-class提供的一个pvc:
$ kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
flink-ha-pvc Bound pvc-46537a5b-2adc-442e-ae59-52af4c681f2c 500Mi RWX nfs-storage 16h
以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):
$ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-application-cluster \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ -Dhigh-availability.storageDir=file:///opt/flink/flink-ha \ -Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \ -Dkubernetes.container.image=top-speed-windowing:1.12.0 \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar
检查一下:
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-application-cluster-9589dbf58-hm7xj 1/1 Running 0 76s
flink-application-cluster-taskmanager-1-1 1/1 Running 0 33s
$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name: flink-application-cluster-9589dbf58-hm7xj
Namespace: default
Priority: 0
Node: 10.9.1.18/10.9.1.18
Start Time: Thu, 24 Dec 2020 10:26:07 +0800
Labels: app=flink-application-cluster
component=jobmanager
pod-template-hash=9589dbf58
type=flink-native-kubernetes
Annotations: <none>
Status: Running
IP: 172.20.0.165
IPs:
IP: 172.20.0.165
Controlled By: ReplicaSet/flink-application-cluster-9589dbf58
Containers:
flink-job-manager:
Container ID: docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
Image: top-speed-windowing:1.12.0
Image ID: docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
Ports: 8081/TCP, 6123/TCP, 6124/TCP
Host Ports: 0/TCP, 0/TCP, 0/TCP
Command:
/docker-entrypoint.sh
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
State: Running
Started: Thu, 24 Dec 2020 10:26:11 +0800
Ready: True
Restart Count: 0
Limits:
cpu: 1
memory: 1600Mi
Requests:
cpu: 1
memory: 1600Mi
Environment:
_POD_IP_ADDRESS: (v1:status.podIP)
Mounts:
/opt/flink/conf from flink-config-volume (rw)
/opt/flink/flink-ha from jobmanager-ha (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
flink-config-volume:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: flink-config-flink-application-cluster
Optional: false
jobmanager-ha:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: flink-ha-pvc
ReadOnly: false
default-token-pzw5h:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-pzw5h
Optional: false
QoS Class: Guaranteed
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> default-scheduler Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
Warning FailedMount 80s (x2 over 81s) kubelet, 10.9.1.18 MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
Normal Pulled 78s kubelet, 10.9.1.18 Container image "top-speed-windowing:1.12.0" already present on machine
Normal Created 78s kubelet, 10.9.1.18 Created container flink-job-manager
Normal Started 77s kubelet, 10.9.1.18 Started container flink-job-manager
可以看到已经正确挂载了。
这个PR能不能用于生产?
能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。
另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。
不过,如果你只想完全用官方的东西,那完全可以像之前版本一样,使用非Native的方式在Kubernetes上面部署Flink,不过我还是喜欢Native的东西,更加简单。
评论已关闭