一、Kafka 核心 API
下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型
Kafka的五类客户端API类型如下:
- AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似。
- Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API。
- Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API。
- Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景。
- Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB。
本文中,我们将主要介绍 Producer API。
二、生产者客户端的基本架构图
由上图可以看出:KafkaProducer有两个基本线程。
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了
两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
-
主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中(这里可以看出拦截器确实在序列化和分区之前执行)。
- 消息收集器主要的作用是缓存消息,让发送线程可以批量发送,减少网络传输资源消耗提升性能,缓存大小可以通过buffer.memory配置,默认值为32MB,如果生产者发送消息的速度超过发送到服务器的速度,则send()方法要么被阻塞,要么抛出异常,取决于参数max.block.ms,默认值为60000ms
- 主线程发送的消息被追加到消息累加器的一个双端队列中,消息收集器RecoderAccumulator为每个分区都维护了一个 Deque<ProducerBatch> 类型的双端队列,队列中是ProducerBatch,包含多个ProducerRecord。
- ProducerBatch 可以暂时理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响。
- 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
- 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。
-
Sender线程:
- 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。
- 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
- 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。
相关参数:
- batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
- linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
三、Producer API
3.1、导入相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
3.2、Producer异步发送演示
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer异步发送演示
*/
public static void producerSend(){
Properties properties = new Properties();
//kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack应答级别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息对象 ProducerRecoder
for (int i = 0; i < 10; i++) {
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record);
}
producer.close();
}
3.3、Producer异步发送演示
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是
RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果
Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer异步发送带回调函数演示
*/
public static void producerSendWithCallback(){
Properties properties = new Properties();
//kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack应答级别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息对象 ProducerRecoder
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
producer.send(record, new Callback() {
//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
} else {
log.error("exception",e);
}
}
});
producer.close();
}
3.4、Producer异步发送带回调函数和Partition负载均衡
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer异步发送带回调函数和Partition负载均衡
*/
public static void producerSendWithCallbackAndPartition(){
Properties properties = new Properties();
//kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack应答级别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//Partition负载均衡
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
//创建生产者对象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息对象 ProducerRecoder
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
producer.send(record, new Callback() {
//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
} else {
log.error("exception",e);
}
}
});
producer.close();
}
/**
* @Description: Partitioner分区接口,以实现自定义的消息分区
*
* 默认分区器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
*
* 如果消息的key为null,此时producer会使用默认的partitioner分区器将消息随机分布到topic的可用partition中。
* 如果key不为null,并且使用了默认的分区器,kafka会使用自己的hash算法对key取hash值,
* 使用hash值与partition数量取模,从而确定发送到哪个分区。
* 注意:此时key相同的消息会发送到相同的分区(只要partition的数量不变化)
*/
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
/**
*由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
*/
if(keyBytes == null) {
throw new InvalidRecordException("key cannot be null");
}
if(((String)key).equals("1")) {
return 1;
}
System.out.println("key: " + key);
//如果消息的key值不为1,那么使用hash值取模,确定分区。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
3.5、Producer异步阻塞发送演示
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同
步发送的效果,只需在调用 Future 对象的 get 方发即可。
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer异步阻塞发送演示
*/
public static void producerSyncSend() throws Exception {
Properties properties = new Properties();
//kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack应答级别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息对象 ProducerRecoder
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
producer.close();
}
四、自定义 Interceptor
4.1、拦截器原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定义的方法包括:
1、configure(configs):
获取配置信息和初始化数据时调用。2、onSend(ProducerRecord):
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。3、onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程
中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer 的消息发送效率。4、close:
关闭 interceptor,主要用于执行一些资源清理工作。
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保
线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
4.2、 拦截器案例
- 1、需求:
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间
戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
2、案例实操
1)增加时间戳拦截器
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
// 创建一个新的 record,把时间戳写入消息体的最前部
return new ProducerRecord(producerRecord.topic(),
producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
System.currentTimeMillis() + "," + producerRecord.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
}
- 2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return null;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 统计成功和失败的次数
if (e == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
- 3)producer 主程序
public class InterceptorProducer {
private static final String TOPIC_NAME = "yibo_topic";
public static void main(String[] args) {
// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
producer.send(record);
}
// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
producer.close();
}
}
五、SpringBoot 集成 Kafka
5.1、添加maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.1</version>
</dependency>
5.2、配置 application.properties
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092
#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#=============== listener =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
5.3、新建Producer
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_NAME = "yibo_topic";
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("准备发送消息为:{}", obj2String);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(TOPIC_NAME + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(TOPIC_NAME + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}