1. kafka 生产者发送消息的流程
2. Kafka 生产者发送数据的3种方式
(1) 发送并忘记(fire-and-forget)
把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Title Producer01.java
* @Description Kafka 生产者发送消息的第一种方式:发送并忘记
* @Author YangYunhe
* @Date 2018-06-21 10:35:34
*/
public class Producer01 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 16384); // 16K
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432); // 32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer01.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
// 创建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可选的
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 只管发送消息,不管是否发送成功
producer.send(record);
Thread.sleep(100);
}
producer.close();
}
}
(2) 同步发送
使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常),
就可以知道消息是否发送成功。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer02.java
* @Description Kafka 生产者发送消息的第二种方式:同步发送
* @Author YangYunhe
* @Date 2018-06-21 10:38:37
*/
public class Producer02 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer02.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
RecordMetadata metadata = producer.send(record).get();
StringBuilder sb = new StringBuilder();
sb.append("record [").append(line).append("] has been sent successfully!").append("\n")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
Thread.sleep(100);
}
producer.close();
}
}
(3) 异步发送
大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。
不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer03.java
* @Description Kafka 生产者发送消息的第三种方式:异步发送
* @Author YangYunhe
* @Date 2018-06-21 11:06:05
*/
public class Producer03 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer03.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// 如果发送消息成功,返回了 RecordMetadata
if(metadata != null) {
StringBuilder sb = new StringBuilder();
sb.append("message has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
}
// 如果消息发送失败,抛出异常
if(e != null) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
producer.close();
}
}
3. 多线程生产者
在数据量比较大同时对发送消息的顺序没有严格要求时,可以使用多线程的方式发送数据,实现多线程生产者有两种方式:1. 实例化一个 KafkaProducer 对象运行多个线程共享该对象发送消息;2. 实例化多个 KafkaProducer 对象。
由于 Kafka Producer 是线程安全的,所以多个线程共享一个 Kafka Producer 对象在性能上要好很多。
(1) 线程类实现
package com.bonc.rdpe.kafka110.thread;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title KafkaProducerThread.java
* @Description 多线程生产者的线程类实现
* @Author YangYunhe
* @Date 2018-06-25 13:54:38
*/
public class KafkaProducerThread implements Runnable {
private KafkaProducer<String, String> producer;
private ProducerRecord<String, String> record;
public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}
@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("exception occurs when sending message: " + exception);
}
if(metadata != null) {
StringBuilder result = new StringBuilder();
result.append("message[" + record.value() + "] has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(result.toString());
}
}
});
}
}
(2) 发送消息的具体实现
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.bonc.rdpe.kafka110.thread.KafkaProducerThread;
/**
* @Title MultiProducer.java
* @Description 多线程生产者的测试代码
* @Author YangYunhe
* @Date 2018-06-25 14:30:58
*/
public class MultiProducer {
private static final int THREADS_NUMS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUMS);
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record;
try {
for(int i = 0; i < 100; i++) {
record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello" + i);
executor.submit(new KafkaProducerThread(producer, record));
Thread.sleep(1000);
}
}catch (Exception e) {
System.out.println("exception occurs when sending message: " + e);
}finally {
producer.close();
executor.shutdown();
}
}
}
(3) 运行结果:
message[hello0] has been sent successfully! send to partition 1, offset = 705
message[hello1] has been sent successfully! send to partition 0, offset = 705
message[hello2] has been sent successfully! send to partition 2, offset = 704
message[hello3] has been sent successfully! send to partition 1, offset = 706
message[hello4] has been sent successfully! send to partition 0, offset = 706
......
4. Kafka Producer 常用配置(kafka-1.1.0)
(1) acks
- 类型:string
- 默认值:1
- 可设置值:[all, -1, 0, 1]
- 重要性:高
- 说明:
- 0:生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
- 1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
- all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
- -1:作用与"all"是一样的。
(2) buffer.memory
- 类型:long
- 默认值:33554432(32M)
- 可设置值:[0,...]
- 重要性:高
- 说明:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置 max.block.ms (类型:long,默认值:60000(1分钟),可设置值:[0,...],重要性:中等)参数。表示在抛出异常之前可以阻塞的时间。
(3) compression.type
- 类型:string
- 默认值:none
- 可设置值:[none, gzip, snappy, lz4]
- 重要性:高
- 说明:该参数可以指定消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
(4) retries
- 类型:int
- 默认值:0
- 可设置值:[0,...,2147483647]
- 重要性:高
- 说明:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms(类型:long,默认值:100, 可设置值:[0,...],重要性:低) 参数来改变这个时间间隔。
建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如"消息太大"错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
(5) batch.size
- 类型:int
- 默认值:16384(16K)
- 可设置值:[0,...]
- 重要性:中等
- 说明:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
(6) linger.ms
- 类型:long
- 默认值:0
- 可设置值:[0,...]
- 重要性:中等
- 说明:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
(7) max.in.flight.requests.per.connection
- 类型:int
- 默认值:5
- 可设置值:[1,...]
- 重要性:低
- 说明:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
(8) max.request.size
- 类型:int
- 默认值:1048576
- 可设置值:[0,...]
- 重要性:中等
- 说明:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes(类型:int,默认值:1000012,大约0.95M,可设置值:[0,...],重要性:高)),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
(9) receive.buffer.bytes 和 send.buffer.bytes
receive.buffer.bytes
- 类型:int
- 默认值:32768(32K)
- 可设置值:[-1,...]
- 重要性:中等
send.buffer.bytes
- 类型:int
- 默认值:131072(128K)
- 可设置值:[-1,...]
- 重要性:中等
说明:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
(10) client.id
- 类型:string
- 默认值:""
- 可设置值:任意字符串
- 重要性:中等
- 说明:该参数可以是任意的字符串,服务器会用它来识别消息的来源。
(11) request.timeout.ms
- 类型:int
- 默认值:30000
- 可设置值:[0,...]
- 重要性:中等
- 说明:该参数指定了生产者在发送数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。[metadata.fetch.timeout.ms] and [timeout.ms] have been removed. They were initially deprecated in Kafka 0.9.0.0.
(12) max.block.ms
- 类型:long
- 默认值:60000
- 可设置值:[0,...]
- 重要性:中等
- 说明:该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
(13) connections.max.idle.ms
- 类型:long
- 默认值:540000
- 可设置值:[0,...]
- 重要性:中等
- 说明:关闭空闲连接的等待时间,检测到空闲的连接后,默认等待9分钟才会关闭这个连接。
(14) metadata.max.age.ms
- 类型:long
- 默认值:300000
- 可设置值:[0,...]
- 重要性:低
- 说明:更新元数据的时间间隔,在等待该参数配置的时间后,即使 producer 没有发现任何 partition 或 leader 的变化,也会强制刷新元数据。
(15) reconnect.backoff.ms
- 类型:long
- 默认值:50
- 可设置值:[0,...]
- 重要性:低
- 说明:尝试重新连接 broker 的时间间隔。
(16) reconnect.backoff.max.ms
- 类型:long
- 默认值:1000
- 可设置值:[0,...]
- 重要性:低
- 说明:如果重新连接的时间累积到达该参数的配置时间还没有连接到 broker,那么宣告连接失败。