在之前的文章 Flink快速了解(4)——NativeKubernetes&HA 中讲到 Native Kubernetes在Flink 1.12版本中已经成为一个正式特性,使用起来也的确非常的简单、方便,但文末提到我碰到的一个问题:无法挂载volume。其实目前Flink Native Kubernetes这种方式提供的容器自定义能力还非常有限。从代码看,是通过一个个配置去支持的(见KubernetesConfigOptions.java),但k8s的Pod定义选项太多了,通过这种方式去支持,会一直疲于奔命,而且还要不断的和k8s版本关联。所以,目前社区有一个JIRA FLINK-15656: Support user-specified pod templates,计划直接支持用户自定义pod template,但目前好像还没有明确的版本计划。另外,考虑到pod挂载volume是一个更加普遍化的高需求,所以还有一个单独的JIRA FLINK-15649: Support mounting volumes,不过目前也没有明确的版本计划。

我看了一下这个JIRA,其实已经有人提了PR(#14283)了,不过还没有被合进去。这个PR的代码非常简单,有兴趣的可以看下,我把这个代码合到我本地的1.12分支,然后把新增的3个class和修改的3个class文件加到官方1.12发布的包中测试了一下,是可以实现volume mount的。下面记录一下过程,有兴趣的可以自行编译,或者直接下载我编译好的(点此下载,密码: hi52。怎么现在分享还必须设置密码了...)。

使用说明

这个PR增加的功能是给Flink Native Kubernetes部署模式下的JobManager和TaskManager增加volume mount的功能,支持 emptydir(默认)、hostpath、pvc三种。使用方式代码里面也写清楚了:

// KubernetesConfigOptions
public static final ConfigOption<String> JOBMANAGER_VOLUME_MOUNT =
    key("kubernetes.jobmanager.volumemount")
        .stringType()
        .noDefaultValue()
        .withDescription("Volume (pvc, emptydir, hostpath) mount information for the Job manager. " +
            "Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
            "parameters - name used for mount, mounting path and volume specific parameters");

public static final ConfigOption<String> TASKMANAGER_VOLUME_MOUNT =
    key("kubernetes.taskmanager.vlumemount")
        .stringType()
        .noDefaultValue()
        .withDescription("Volume (pvc, emptydir, hostpath) mount information for the Task manager. " +
            "Value can contain several commas-separated volume mounts. Each mount is defined by several : separated " +
            "parameters - name used for mount, mounting path and volume specific parameters");

也可以从单元测试文件看使用方法:

// VolumeMountDecoratorTest
@Override
protected void setupFlinkConfig() {
    super.setupFlinkConfig();

    this.flinkConfig.setString(KubernetesConfigOptions.JOBMANAGER_VOLUME_MOUNT.key(),
        VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount1:/opt/pvcclaim/tes1/:testclaim1:false,"
            + VolumeMountDecorator.KUBERNETES_VOLUMES_PVC + ":pvc-mount2::testclaim:false:path1->/opt/pvcclaim/test/path1;path2->/opt/pvcclaim/test/path2,"
            + VolumeMountDecorator.KUBERNETES_VOLUMES_EMPTYDIR + ":edir-mount:/emptydirclaim:" + VolumeMountDecorator.EMPTYDIRDEFAULT + ","
            + VolumeMountDecorator.KUBERNETES_VOLUMES_HOSTPATH + ":hp-mount:/var/local/hp:/var/local/hp:DirectoryOrCreate");
}

emptydir和hostpath的使用非常简单就不说了。pvc的使用有两种方式:

  • 方式1:不使用subpath,共5个参数。示例:-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>:<挂载路径>:<pvc名称>:<false|true>。最后一个false或者true表示是否以只读方式挂载。
  • 方式2:使用subpath,共6个参数。示例:-Dkubernetes.jobmanager.volumemount=pvc:<volume名称,自己起个名字>::<pvc名称>:<false|true>:<subPath>-><mountPath>

下面利用这个PR实现基于NFS的Flink Kubernetes HA。

  1. 先用修改过的flink-dist_2.11-1.12.0.jar替换官方包里面lib目录下的flink-dist_2.11-1.12.0.jar(懒得自己编译的,可以直接下载上面我编译好的,我是在官方包的基础上增加和替换了PR涉及的几个class文件,所以改动量非常小),注意是替换你提交任务的flink包的对应jar,不是替换容器里面的。
  2. 准备好一个pvc,这里我使用的是nfs storage-class提供的一个pvc:
$ kubectl get pvc            
NAME           STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
flink-ha-pvc   Bound    pvc-46537a5b-2adc-442e-ae59-52af4c681f2c   500Mi      RWX            nfs-storage    16h
  1. 以Application cluster的方式提交一个任务(涉及的镜像参见之前的文章):

    $ ./bin/flink run-application \
         --target kubernetes-application \
         -Dkubernetes.cluster-id=flink-application-cluster \
         -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
         -Dhigh-availability.storageDir=file:///opt/flink/flink-ha \
         -Dkubernetes.jobmanager.volumemount=pvc:jobmanager-ha:/opt/flink/flink-ha:flink-ha-pvc:false \
         -Dkubernetes.container.image=top-speed-windowing:1.12.0 \
         -Dkubernetes.rest-service.exposed.type=NodePort \
         local:///opt/flink/usrlib/TopSpeedWindowing.jar

检查一下:

$ kubectl get pod
NAME                                        READY   STATUS    RESTARTS   AGE
flink-application-cluster-9589dbf58-hm7xj   1/1     Running   0          76s
flink-application-cluster-taskmanager-1-1   1/1     Running   0          33s


$ kubectl describe pod flink-application-cluster-9589dbf58-hm7xj
Name:         flink-application-cluster-9589dbf58-hm7xj
Namespace:    default
Priority:     0
Node:         10.9.1.18/10.9.1.18
Start Time:   Thu, 24 Dec 2020 10:26:07 +0800
Labels:       app=flink-application-cluster
              component=jobmanager
              pod-template-hash=9589dbf58
              type=flink-native-kubernetes
Annotations:  <none>
Status:       Running
IP:           172.20.0.165
IPs:
  IP:           172.20.0.165
Controlled By:  ReplicaSet/flink-application-cluster-9589dbf58
Containers:
  flink-job-manager:
    Container ID:  docker://454fd2a6d3a913ce738f2e007f35e61d5068bfd9ad38d76bf900dbf1aaf9b70f
    Image:         top-speed-windowing:1.12.0
    Image ID:      docker://sha256:66d4aa5b13fc7c2ccce21685543fc2d079aac695d3480d9d27dbef2fc50ce875
    Ports:         8081/TCP, 6123/TCP, 6124/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP
    Command:
      /docker-entrypoint.sh
    Args:
      native-k8s
      $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
    State:          Running
      Started:      Thu, 24 Dec 2020 10:26:11 +0800
    Ready:          True
    Restart Count:  0
    Limits:
      cpu:     1
      memory:  1600Mi
    Requests:
      cpu:     1
      memory:  1600Mi
    Environment:
      _POD_IP_ADDRESS:   (v1:status.podIP)
    Mounts:
      /opt/flink/conf from flink-config-volume (rw)
      /opt/flink/flink-ha from jobmanager-ha (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-pzw5h (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  flink-config-volume:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      flink-config-flink-application-cluster
    Optional:  false
  jobmanager-ha:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  flink-ha-pvc
    ReadOnly:   false
  default-token-pzw5h:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-pzw5h
    Optional:    false
QoS Class:       Guaranteed
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type     Reason       Age                From                Message
  ----     ------       ----               ----                -------
  Normal   Scheduled    <unknown>          default-scheduler   Successfully assigned default/flink-application-cluster-9589dbf58-hm7xj to 10.9.1.18
  Warning  FailedMount  80s (x2 over 81s)  kubelet, 10.9.1.18  MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-application-cluster" not found
  Normal   Pulled       78s                kubelet, 10.9.1.18  Container image "top-speed-windowing:1.12.0" already present on machine
  Normal   Created      78s                kubelet, 10.9.1.18  Created container flink-job-manager
  Normal   Started      77s                kubelet, 10.9.1.18  Started container flink-job-manager

可以看到已经正确挂载了。

这个PR能不能用于生产?

能不能用于生产我觉得主要考虑的就是这个PR的可靠程度和后期维护、升级了。从这两个角度考虑我觉得是没问题的。这个PR代码量少,而且简单,实质只是增加了几项配置而已,对已有代码几乎是没有改动的,新增的配置也都是可选配置项,代码的可控性几乎是百分百的。可能更应该关心的是这个PR后面会不会被合到官方分支吧。我个人觉得不一定吧,volume mount的功能几乎肯定会支持,但未必最终使用这个PR的代码。但用了其它代码,对使用者而言,顶多也就是换个jar包,修改下创建任务的命令而已。

另外我觉得最重要的是这个改动只影响提交任务的过程,就这个过程也只影响创建容器的过程,也就是影响面仅限Kubernetes相关的东西,并没有影响任何Flink运行的功能。所以使用这个PR的时候记得只替换宿主机安装包里面的jar即可,不要替换容器里面真正运行的那个jar。

不过,如果你只想完全用官方的东西,那完全可以像之前版本一样,使用非Native的方式在Kubernetes上面部署Flink,不过我还是喜欢Native的东西,更加简单。