概括
总的来讲,一个kafka生产者要发送一条消息,要经过以下几个步骤:
1.创建生产者实例。
2.组装消息对象ProducerRecord。
3.经过拦截器,进行一些定制化处理。
4.经过序列化器,把对象转化成字节数组。
5.在分区器中确定要发送的分区,每个分区都有一个队列,队列中每一个元素都是一批消息,这样做的好处是为了批量发送,减少请求次数,降低网络IO。
6.根据发送方式(异步or同步),对发送成功或失败的请求进行回调处理。
7.关闭生产者实例。
准备阶段
消息对象ProducerRecord
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头
private final K key; //键
private final V value; //值
private final Long timestamp; // 时间戳
}
topic就是发送到主题,partition就是该主题的分区,key的作用也是来确定分区的。但是partition和key之间有如下约定:
- partition优先级比key高,当partition没有值时,才会用 hash(key)去确定这条消息的分区。
- 如果partition和key都没指定,则会按照轮询的方式发送到每个该topic下的每个分区。
生产者拦截器
拦截器是在消息发送前进行一些预处理,对消息做一些定制化需求,一个拦截器需要继承org.apache.kafka.clients.producer.ProducerInterceptor接口,其中包含3个方法:
public ProducerRecord onSend(ProducerRecord record){}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
public void close(){}
onSend方法会在消息序列化之前被调用,可以通过这个方法对消息分区和消息内容等进行修改。onAcknowledgement方法会在消息被应答之前或消息发送失败的时候调用,先于回调函数执行前执行,尽量不要在这个方法内执行繁重的逻辑,影响发送的io效率。剩下的close方法就是用来关闭一些资源。
序列化
生产者需要把对象序列化成字节数组才能传输到kafka,消费者则需要反序列化将字节数组转化为本地对象。生产者和消费者之间需要满足一定的序列化协议,保证生产者发送的消息是消费者可以解析的格式。要构造一个序列化器,需要实现org.apache.kafka.common.serialization.Serializer接口,此接口包含3个方法:
public void configure(Map<String, ?> configs, boolean isKey){}
public byte[] serialize(String topic, String data){}
public void close(){}
其中configure方法是拿到外部配置的参数,之后会在serialize方法中按需使用这些参数。serialize方法就是具体的序列化方法,可以在这里面定制协议,按照一定的顺序格式返回一个字节数组。close方法用来关闭序列化器,一般这个方法都是个空方法。
分区器
分区器主要用来确定消息要发送到哪个分区,默认的分区器主要是由ProducerRecord类中的partition和key确定两个字段进行判断,partition和key之间关系已经在上面说过了,就不多赘述。
发送阶段
消息收集器-RecordAccumulator
在准备阶段完成之后,所要发送的topic和分区已经确定,之后消息并不是直接发送给kafka,而是要进入一个名为RecordAccumulator的消息收集器,这个收集器是起到一个缓冲的作用,对于每一个分区,他都维护了一个双端队列,每次发送给kafka都会以队列中的一个或多个元素为单位进行发送,这个元素名叫ProducerBatch,ProducerBatch实际上就是多个ProducerRecord组成的集合,对多条消息进行批量发送,这样可以大幅减少网络IO次数,提升整体的吞吐量。
请求队列-InFlightRequest
最终,一组ProducerBatch会组装成一个Request发往各个Broker,在发送前会将Request存放到一个名叫InFlightRequest的Map中,这个Map格式为Map<NodeId,Deque<Request>>,用来保存发送到每个Broker但还未接收到响应的请求,当队列中的元素越来越多时,说明这个broker节点负载比较大,继续发送请求可能会请求超时,默认每个NodeId下的Deque大小为5,超过该数值后则暂停向这个节点发送消息,可以通过max.in.flight.requests.per.connection来修改。
同时,从InFlightRequest我们还能获取kafka的节点、副本等一些元数据,提供给Request去建立连接。