acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。
如果acks等于0,生产者在消息成功写入前,不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有接收到消息,生产者无从得知,消息也就丢失了。
如果acks等于1,只要等待首领节点收到消息,生产者就会得到一个服务器的成功响应,如果消息无法到达首领节点,生产者会得到一个错误响应,为了避免数据丢失,生产者会重发消息。
如果acks等于all,则所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
compression.type:默认情况下消息不会被压缩。该参数可以被设置为snappy,gzip。snappy由google发明,snappy使用cpu少,使用较少的网络带宽和性能。gzip使用更多的cpu,但是压缩比更高,如果网络带宽有限,可以使用这种算法。
retries:生产者从服务器收到错误响应时,决定了可以重试的次数,如果达到重试次数,生产者放弃重试并返回错误。默认情况下,生产者在每次重试之间等待100ms,可以通过retry.backoff.ms参数改变这个时间间隔。
batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放到同一个批次里。
linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
client.id:可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。
timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms:指定了生产者在发送数据时等待服务器返回响应的时间
metadata.fetch.timeout.ms:指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
timeout.ms:指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。
max.block.ms:该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
max.request.size:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
receive.buffer.bytes 和 send.buffer.bytes:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。