本文基于Kafka 2.8.

有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系。本文介绍监听相关的配置,目前监听相关的参数主要有下面几个:

  • listeners
  • advertised.listeners
  • listener.security.protocol.map
  • inter.broker.listener.name
  • security.inter.broker.protocol
  • advertised.host.name(历史遗留,已废弃,勿使用)
  • advertised.port(历史遗留,已废弃,勿使用)
  • host.name(历史遗留,已废弃,勿使用)

其中最重要的就是listenersadvertised.listeners:集群启动时监听listeners配置的地址,并将advertised.listeners配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:

  1. 通过listeners配置的连接信息(ip/host)连接到某个Broker(broker会定期获取并缓存zk中的元数据信息),获取元数据中advertised.listeners配置的地址信息。
  2. 通过第1步获取的advertised.listeners连接信息和Kafka集群通信(读/写)。

所以在存在内外网隔离的虚拟化环境中(比如Docker、公有云),外部客户端经常会出现可以连接到Kafka(第1步),但发送/消费数据时报连接超时(第2步),就是因为listeners配置的是外网地址,而advertised.listeners配置的却是内网地址。那这几个参数该如何配置呢?

先看连接信息的配置格式:{listener名字}://{HOST/IP}:{PORT}。HOST/IP、PORT很清楚,主要是这个“listener名字”字段。要理解这个得了解listener.security.protocol.map这个配置项:它的用途是配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”,其默认值是“listener名字”和“协议名称”一样。有点绕,举个例子,比如:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,冒号前面是key,即协议名字;后面是value,即协议名称。listener名字我们可以随便起,而协议名称则是固定可枚举的一个范围。所以如果我们自定义了listener名字,那就需要显式的设置其对应的协议名。

inter.broker.listener.namesecurity.inter.broker.protocol都是用于配置Broker之间通信的,前者配置名称(即listener.security.protocol.map中的key),后者配置协议(即listener.security.protocol.map中的value),默认值是PLAINTEXT。这两个配置项同时只能配置一个。

为什么一个连接要搞这么复杂呢?主要是为了各种不同的场景需求。下面举一个复杂一点的应用场景进行说明。比如我们在一个公有云上面部署了一个Kafka集群,该环境有一个外网地址external_hostname和一个内网地址internal_hostname;且在内部中是无法获取外网地址的(公有云大多都是这样的)。然后想实现内部客户端访问集群时走内部地址,且不需要加密;而外部客户端访问时则走外部地址,且需要加密。要实现这个需求,可以对集群进行如下配置:

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092
inter.broker.listener.name=INTERNAL

其实更进一步,我们还可以通过可选的control.plane.listener.name参数单独定制集群Controller节点与其他Broker节点的连接,那配置信息就变为:

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092,CONTROL://{control_ip}:9094
inter.broker.listener.name=INTERNAL
control.plane.listener.name=CONTROL

最后给出这些配置项的默认值和一些注意事项:

  1. listeners如果不显式的配置,那会监听所有网卡,相当于配置了0.0.0.0。该配置项里面listeners名字和端口都必须是唯一的,不能重复。
  2. advertised.listeners如果不配置,默认使用listeners配置的值。如果listeners也没有显式配置,则使用java.net.InetAddress.getCanonicalHostName()获取的IP地址。如果listeners配置的是0.0.0.0,则必须显式的配置advertised.listeners,因为这个配置项必须是一个具体的地址,不允许是0.0.0.0(因为客户端无法根据这个地址连接到Broker)。另外,advertised.listeners中的端口允许重复。
  3. 对于listenersadvertised.listeners,有多个地址的时候,每一个地址都必须按照{listener名字}://{HOST/IP}:{PORT}格式进行配置,多个地址用英文逗号分隔。
  4. 如果集群所有节点的hostname在客户端和服务端各节点之间可以正确解析,优先使用hostname,而不是IP。因为代码里面使用了java.net.InetAddress.getCanonicalHostName(),有时使用IP会出现访问不通的情况。

总结:listeners地址是用于首次连接的;advertised.listeners的地址是会写到zk里面,客户端通过listeners地址建立连接获取该地址信息,然后通过该地址和集群交互。所以对于客户端,这2个地址必须都是可以访问的才可以。