生产者
客户端开发
正常的生产逻辑需要具备以下几个步骤:
- 构建生产者客户端参数及创建相应的生产者实例
- 构建待发送的消息
- 发送消息
- 关闭生产者实例
消息的发送
发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)
发后即忘:
只管往Kafka发送消息,而不关心消息是否正确到达。在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式性能最高(因为不用做检查处理,牺牲了消息的安全性,达到高吞吐的目的),可靠性最差。
同步:
利用返回Future对象实现如下代码所示,失败时,捕捉异常,并做响应的异常处理。也可以通过注释的代码部分获取RecordMetadata对象,包含当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。
异常
- 可重试异常:
常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复
配置retries 参数设置可重试异常的重试次数。如果重试超过设置次数之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。
- 不可重试异常
RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。
private static void sendData() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry",
"Precision Producer", "France2");
try {
producer.send(record).get();
// Future<RecordMetadata> future = producer.send(record);
// RecordMetadata metaData = future.get();
} catch (Exception e) {
System.out.println("send Fail:" + e.getMessage());
}
producer.close();
}
异步发送
- 一般是在send()方法中指定一个Callback的回调函数,Kafka在返回响应式调用改函数来实现异步的发送请求。
- onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// recordMetadata和e必有一个为null
if (e != null) {
e.printStackTrace();
} else {
System.out.println(recordMetadata);
}
}
});
close()方法
close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。下面这个带超时时间参数的close()方法,只会等待timeout的时间,如果在timeout时间后仍未完成所有的请求处理,会强行退出,在实际应用中,一般使用无参close()方法。
public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。而在接收方,消费者需要使用反序列化器把Kafka中收到的字节数组转换成对应的对象。
生产者拦截器
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
onSend()
调用时间点:将消息序列化和计算分区之前
KafkaProducer会在将消息序列化和计算分区之前调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。不建议在这里修改ProducerRecord的topic、key和partition信息(可能会对问题的排查和定位造成影响,以及出现难以预料的bug),比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩的功能。
onAcknowledgement()
调用时间点:在消息被应答(acknowledgement)之前或消息发送失败时,优先于用户设定的CallBack方法。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close()
close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
生产者客户端整体架构
分区
如果key为null,会被随机分配到主题内各个可用的分区。如果key不为null,会对key进行计算决定分到哪个分区。
优点
使用Kafka自己的hash算法对key进行运算,并不会因为java版本的升级导致分区结果不同。
缺点
由于使用了hash算法,对横向扩展不友好。一旦主题增加了分区的个数,可能会造成旧的数据还在老的分区,新的数据被分配到了新的分区。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好。
自定义分区策略
如果出现某个key的数据量特别大,导致按默认key分区后,对应的分区数据量明显过大,从而导致存储和性能上的问题。这时候就需要使用自定义分区策略实现(通过实现Partitioner)。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
import java.io.InvalidClassException;
import java.util.Map;
/**
* @author by yze on 2020/11/8
* @since 202011
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (keyBytes == null || !(key instanceof String)) {
throw new InvalidRecordException("不支持的数据类型");
}
if (((String) key).equals("special")) {
// 专门定制到一个分区
return numPartitions;
}
// 其它的key会计算后被分配到除了最后一个分区以外的分区
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}