上篇文章介绍了Kafka Consumer,这篇文章讨论Kafka Producer。

Kafka Producer流程概述

下面是一个简单的Producer实现:

public class SimpleProducer {

  public static void main(String[] args) {
    Properties config = new Properties();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-write");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.RETRIES_CONFIG, 0);
    config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
    config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    Producer<String, String> producer = new KafkaProducer<>(config);

    for (int i = 0; i < 1000; i++) {
      ProducerRecord<String, String> record = new ProducerRecord<>("test-write", Integer.toString(i));
      producer.send(record, (m, e) -> {
        if (e != null) {
          e.printStackTrace();
        } else {
          System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n",
              m.topic(), m.partition(), m.offset());
        }
      });
    }

    producer.flush();
    producer.close();
  }
}

大概流程就是:

  • 创建KafkaProducer(KafkaConsumer不是线程安全的,但KafkaProducer是线程安全的);
  • 创建ProducerRecord
  • 调用send方法发送record。该方法返回的是一个Future对象,而且也允许指定回调函数。所以要是想同步发送,则调用Future对象的get即可;但一般建议定义回调函数,异步发送;
  • 最后close掉对象,释放资源;

以上便是从用户代码角度看到的Producer的流程,下面从Kafka内部来看。如下图(来自参考部分第一篇文章):

kafka-producer-internal

KafkaProducer内部主要由一个Memory Buffer和一个后台IO线程组成,数据写入流程如下:

  1. 生成ProducerRecord对象,该对象必传的2个参数是topic和message value。
  2. 调用send发送生成的record。
  3. 调用指定的序列化函数序列化key和value,生成字节数组以方便在网络上传输。
  4. 计算分区号。Kafka的分区策略是这样的:如果record中指定了partition字段,则直接使用。如果没有指定,则检查record中是否指定了key,如果有则基于该key计算哈希。如果key也没有指定,则使用一个叫"sticky partition"的策略计算分区号。sticky partition是2.4版本加入的,之前的版本是“round-robin”。改成sticky是因为轮询策略会把一批数据分散到多个batch去,这样batch比较小,批量的效果就不是很好。而sticky实质是个优化版本的轮询,它会等一个batch满了以后,再将record分配给下个batch,具体见KIP-480
  5. 将数据加到按partition分组的mem buffer里面的batch里边。加到batch以后,send方法就返回了。
  6. 后台IO线程从mem buffer里面取一个batch的数据封装成一个request发送给kafka broker。
  7. 后端broker成功接收以后,就会返回RecordMetadata对象,该对象包含主题、分区号、分区内的offset。如果接收失败,就会返回错误,此时如果配置了重试,就会自动重试若干次。重试之后还失败,就会抛出异常。

一些重要配置

学习KafkaProducer的重点之一就在于掌握一些重要参数的使用,下面仅是进行简单介绍,完整的课参考官方文档。

可靠性相关

主要就是acks了,有3个值:

  • ack=0:数据发送后,不需要服务端确认;
  • ack=1:默认值,数据发送后,Leader Partition写成功就返回;
  • ack=all(或者-1),ISR中所有节点的Partition写成功才返回;

关于可靠性我之前已经有单独的文章《Kafka的可靠性》介绍过了,这里就不赘述了。

重试和超时

这部分配置主要影响的是是否会产生重复数据。

  • retries:send失败后的自动重试次数,默认为0. 当大于0的时候,重试可能导致request乱序。
  • max.in.flight.requests.per.connection:一个连接允许同时发送request的个数,超过这个个数会block。一般配合retris使用:当retries设置大于0时,该参数设置为1,仍然可以保证顺序。kafka 1.1版本之前的默认值是1,1.1及之后版本默认值是5.
  • request.timeout.ms:producer等待request响应的最大时间,如果超过这个时间,就会认为发送失败,会开始retry(如果配置了的话)。
  • delivery.timeout.ms:请求发送的上限时间,从send方法返回开始算起。delivery timeout = time delay prior to sending + time to await ack + time for retries.默认值为120000,即2分钟。
  • retries.backoff.ms:重试间隔

Batch

批量的定时达量配置:

  • batch.size:batch的大小,单位是字节。设置为0表示关闭batch功能。
  • linger.ms: the producer will wait for linger.ms time before sending the record. This allows multiple records to be batched together.

batch request发送到broker的条件是:达到batch.size的大小,或者等待时间超过linger.ms。即所谓的定时或者达量。

Memory Buffer

对Producer内存使用的控制:

  • buffer.memory:用于缓存record的内存总大小,单位是字节。超过以后send会阻塞。
  • max.block.ms:send方法允许阻塞的最长时间,超时就会抛出异常。

Compression

  • compression.type:压缩类型,默认为none,表示不压缩。可选的值有:none, gzip, snappy, lz4 or zstd.

压缩这里有几个注意点:

  • 压缩是针对整个batch压缩的,所以batch越大,压缩效率越高。
  • 一般来说一定要保证producer和broker端的压缩方法一致,不然会出现producer压缩发送到broker之后,broker又解压然后按自己的压缩算法重新压缩(CPU资源会使用比较多)。一般比较好的配置策略是producer压缩,broker端保持(即不配置压缩)。

Kafka 0.11版本引入了两个特殊的Producer:idempotent producertransactional producer,下面分别介绍。

Idempotent Producer

解决什么问题

幂等producer主要是为了解决下面这种重试导致数据重复的问题:

duplicate

使用幂等producer的方法是设置:enable.idempotence = true(默认是false)。这个配置等价于下面配置项的组合:

  • acks = all
  • retries = Integer.MAX_VALUE
  • max.in.flight.requests.per.connection = 1 (0.11 >= Kafka < 1.1) OR 5 (Kafka >= 1.1)

也就是幂等producer是为了提供“exactly once”语义。

如何解决

对于每个session,

  • kafka leader broker会给每个producer分配一个唯一的producer id;
  • 每次send之前,producer会给每个record分配一个单调递增的sequence number;
  • kafka broker会记录每个partition上面每个producer id的最大sequence number,对于某个producer,如果收到小的sequence number,就会丢掉这个record。

idempotent

但是注意:

  • 应用层自己的re-send操作导致的重复幂等producer不能去重,因为这个时候sequence number已经变了
  • 只能保证当前session内的exactly once。

如何使用

以下摘自Kafka 2.8 Kafka Producer Javadoc:

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

简单概括就是:

  • 应用层(失败时)不要自己重发,内部有自动尝试;如果有错误导致无限重试时,就shutdown程序来定位问题;
  • 设置enable.idempotence = true(此时retries会自动设置为Integer.MAX_VALUE,用户不要修改)
  • 注意幂等只能在一个session范围内提供幂等性

Transactional Producer

如其名,transactional producer能提供事务保证,使用方式很简单,给当前Producer设置以下两个参数:

  • 设置enable.idempotence = true
  • 设置一个transactional.id,这个id在消费同一个partition的producer里面要是唯一的( it should be unique to each producer instance running within a partitioned application)。

编码模式如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer  producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

这种模式大家很熟悉,和数据库的事务使用基本一样,语义也一致,这里就不赘述了。

另外,transactional producer是不依赖于session的,也就是即使session断了,甚至是进程重启都不影响。前面需要设置的那个transactional.id就是为了在出现问题时,能够跨session恢复。
还有一个注意点就是transactional producer的机制其实还是“预写日志(WAL)”,所以即使失败,数据也可能写入日志了。在DB中,这部分数据client肯定是读不到了,但在kafka里面能不能读到,是由客户端的参数isolation.level决定的,这个配置有2个可选值:

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

Transactional Producer提供的也是exactly once的语义保证。

KafkaProducer Class Javadoc

其实Kafka Producer的知识有2个非常好的获取方式都已经在Kafka Producer代码里面了,KafkaProducer类的Javadoc非常详细和全面的讲解了Kafka Producer,其onSend方法则可以看到发送的全流程,想深入了解的,可以从这2个途径获得更详细的信息。下面摘抄了kafka 2.8 KafkaProducer类完整的Javadoc,本文很多内容可以从里面找到出处和说明,完整内容如下(在线地址:Kafka 2.8 KafkaProducer Javadoc):

A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("linger.ms", 1);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
 Producer  producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
   producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i)));
​
 producer.close();

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.

If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).

The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).

By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included org.apache.kafka.common.serialization.ByteArraySerializer or org.apache.kafka.common.serialization.StringSerializer for simple string or byte types.
From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.

To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.

The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application.

All the new transactional APIs are blocking and will throw exceptions on failure. The example below illustrates how the new APIs are meant to be used. It is similar to the example above, except that all 100 messages are part of a single transaction.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer  producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
​
producer.initTransactions();
​
try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.

The transactional producer uses exceptions to communicate error states. In particular, it is not required to specify callbacks for producer.send() or to call .get() on the returned Future: a KafkaException would be thrown if any of the producer.send() or transactional calls hit an irrecoverable error during a transaction. See the send(ProducerRecord) documentation for more details about detecting errors from a transactional send.

By calling producer.abortTransaction() upon receiving a KafkaException we can ensure that any successful writes are marked as aborted, hence keeping the transactional guarantees.

This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain client features. For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an UnsupportedVersionException when invoking an API that is not available in the running broker version.

总结

KafkaConsumer是线程不安全的,所以之前介绍KafkaConsumer的文章重点在offset的控制,特别是多线程消费的时候。但KafkaProducer是线程安全的,所以他的重点和难点不在于多线程,而在于如何保证数据发送时的可靠性、效率、语义保证等。KafkaProducer提供了大量的配置,可以让用户根据自己的场景进行配置。还提供了幂等Producer和事务Producer,这二者目的都是为了提供exactly once语义保证,而且事务Producer还可以提供跨session的事务(当然性能也会比较差)。但是我们需要清楚,这些保证都是有条件的,比如应用层用户自己的重发是无法保证的,当然还有其它情况。所以和上篇文章中提到的一样,如果想实现端到端的exactly once,光靠kafka是不行的,需要全局考虑。

参考