本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客户端的API来发送消息。
概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过Kafka返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不同的业务需要使用不同的写入方式和配置。后面我们将会讨论这些API,现在先看下生产者写消息的基本流程:
流程如下:
- 首先,我们需要创建一个ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
- 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
- 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
- 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常。
- 生产者接收到结果后,对于异常可能会进行重试。
创建Kafka生产者
创建Kafka生产者有三个基本属性:
- bootstrap.servers:属性值是一个host:port的broker列表。这个属性指定了生产者建立初始连接的broker列表,这个列表不需要包含所有的broker,因为生产者建立初始连接后会从相应的broker获取到集群信息。但建议指定至少包含两个broker,这样一个broker宕机后生产者可以连接到另一个broker。
- key.serializer:属性值是类的名称。这个属性指定了用来序列化键值(key)的类。Kafka broker只接受字节数组,但生产者的发送消息接口允许发送任何的Java对象,因此需要将这些对象序列化成字节数组。key.serializer指定的类需要实现org.apache.kafka.common.serialization.Serializer接口,Kafka客户端包中包含了几个默认实现,例如ByteArraySerializer、StringSerializer和IntegerSerializer。
- value.serializer:属性值是类的名称。这个属性指定了用来序列化消息记录的类,与key.serializer差不多。
Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
下面是一个样例代码:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
创建完生产者后,我们可以发送消息。Kafka中有三种发送消息的方式:
- 只发不管结果(fire-and-forget):只调用接口发送消息到Kafka服务器,但不管成功写入与否。由于Kafka是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。
- 同步发送(Synchronous send):调用send()方法返回一个Future对象,我们可以使用它的get()方法来判断消息发送成功与否。
- 异步发送(Asynchronous send):调用send()时提供一个回调方法,当接收到broker结果后回调此方法。
本章的例子都是单线程发送的,但生产者对象是线程安全的,它支持多线程发送消息来提高吞吐。需要的话,我们可以使用多个生产者对象来进一步提高吞吐。
发送消息到Kafka
最简单的发送消息方式如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
这里做了如下几件事:
- 我们创建了一个ProducerRecord,并且指定了主题以及消息的key/value。主题总是字符串类型的,但key/value则可以是任意类型,在本例中也是字符串。需要注意的是,这里的key/value的类型需要与serializer和生产者的类型匹配。
- 使用send()方法来发送消息,该方法会返回一个RecordMetadata的Future对象,但由于我们没有跟踪Future对象,因此并不知道发送结果。如前所述,这种方式可能会丢失消息。
- 虽然我们忽略了发送消息到broker的异常,但是我们调用send()方法时仍然可能会遇到一些异常,例如序列化异常、发送缓冲区溢出异常等等。
同步发送消息
同步发送方式可以简单修改如下:
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
注意到,这里使用了Future.get()来获取发送结果,如果发送消息失败则会抛出异常,否则返回一个RecordMetadata对象。发送失败异常包含:1)broker返回不可恢复异常,生产者直接抛出该异常;2)对于broker其他异常,生产者会进行重试,如果重试超过一定次数仍不成功则抛出异常。
可恢复异常指的是,如果生产者进行重试可能会成功,例如连接异常;不可恢复异常则是进行重试也不会成功的异常,例如消息内容过大。
异步发送消息
首先了解下什么场景下需要异步发送消息。假如生产者与broker之间的网络延时为10ms,我们发送100条消息,发送每条消息都等待结果,那么需要1秒的时间。而如果我们采用异步的方式,几乎没有任何耗时,而且我们还可以通过回调知道消息的发送结果。
异步发送消息的样例如下:
public class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
异步回调的类需要实现org.apache.kafka.clients.producer.Callback接口,这个接口只有一个onCompletion方法。当Kafka返回异常时,异常值不为null,代码中只是简单的打印,但我们可以采取其他处理方式。
kafka生产者 配置
- acks 和 timeout.ms
timeout.ms(0.9.0.0版本中就被弃用)
指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。-
acks = 1
指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消
息丢失的可能性有重要影响。该参数有如下选项:acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中
出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为
生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很
高的吞吐量。acks=1,只要集群的 Leader 节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达 Leader 节点(比如首领节点崩溃,新的 Leader 还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新Leader,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
buffer.memory=33554432
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果生产消息的速度超过发送的速度,会导致生产者空间不足。这个时候,send()
方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full
参数(在 0.9.0.0 版本里被替换成了max.block.ms
,表示在抛出异常之前可以阻塞一段时间)compression.type=none
默认情况下,消息发送时不会被压缩。该参数可以设置为snappy
、gzip
或lz4
,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。
- snappy 压缩算法由 Google 发明,占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。
- gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
- retries 和 retry.backoff.ms
retries=0
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到 Leader)。在这种情况下,retries
参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。retry.backoff.ms=100
默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过retry.backoff.ms
参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出 Leader 需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误和重试次数超出上限的情况。
- batch.size 和 linger.ms
-
batch.size:=16384
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。 -
linger.ms:=0
指定了生产者在每次发送消息的时间间隔
当批次被填满 或者 等待时间达到
linger.ms
设置的间隔时间,批次里的所有消息会被发送出去,哪怕此时该批次只有一条消息。
所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
client.id=''
该参数可以是任意的字符串,服务器会用它来识别消息的来源max.in.flight.requests.per.connection=5
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
如何保证顺序性:如果把 retries 设为非零整数,同时把
max.in.flight.requests.per.connection
设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把
retries
设为 0。可以把max.in.flight.requests.per.connection
设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
- request.timeout.ms 和 metadata.fetch.timeout.ms
-
request.timeout.ms=305000
指定了生产者在发送数据时等待服务器返回响应的时间 -
metadata.fetch.timeout.ms (0.9.0.0版本中就被弃用)
指定了生产者在获取元数据(比如目标分区的 Leader 是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
-
max.request.size=1048576
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes
),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
注意区分
batch.size
只是针对一个 topic 的 partition,而max.request.size
针对单次请求的。
-
receive.buffer.bytes=32768 和 send.buffer.bytes=131072
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
关于更多的配置信息,可以查看:http://kafka.apachecn.org/documentation.html#configuration
完整实例
package com.neuedu;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");//该地址是集群的子集,用来探测集群。
props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
props.put("retries", 3);// 请求失败重试的次数
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
// ByteArraySerializer或者StringSerializer
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
{
producer.send(new ProducerRecord<String, String>("payment", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
通过上面的一些讲解,应该已经可以比较友好的使用 kafka生产者了,接下来我们还剩下最后一个部分,kafka的分区
分区
我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。
如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。
如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数。这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败。另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况。
自定义分配器
在kafka配置参数时设置分区器的类
//设置自定义分区
kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");
现在来看下如何自定义一个分配器,下面将key为Banana的消息单独放在一个分区,与其他的消息进行分区隔离:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions - 1; // Banana will always go to last partition
// Other records will get hashed to the rest of the partitions
return (Math.abs(Utils.murmur2(keyBytes)) % numPartitions)
}
public void close() {}
}