Kafka生产者

1、客户端开发

    下面是构造的消息对象ProducerRecord.

public class ProducerRecord<K,V>(

      private final String topic; // 主题

      private final Integer partition; // 分区号

      private final Headers headers;      // 消息头部

      private final K key;    // 键

      private final V value;  // 值

      private final Long timestamp; // 消息时间戳

      // 省略其他成员方法和构造函数

)

    它并不是但存意义上的消息,而是包含了多个属性,原本业务上与业务相关的消息体只是其中一个value属性。

1.1、消息的发送

    消息发送主要有三种模式:发后即忘 、同步以及异步。

public Future<RecordMetadata> send (ProducerRecord<K,V) record)

public Future<RecordMetadata> send (ProducerRecord<K,V) record, Callback callback)

    send方法重载了两个方法。

    第一个send就是发后即忘,它只管往kafka中发送消息而不关心消息是否正确到达。这种一帮情况下其实不会出现什么问题,但是可能会造成消息丢失。这种方法效率最高,但是可靠性最差。

    同步:利用返回的Future对象实现,比如producer.send(record).get()。执行send()方法之后直接链式调用了get()方法来阻塞等待kafka的响应,直到发送成功,或者发送异常。同步发送的可靠性最高,但是性能也最差,需要阻塞等待一条消息发送完才可以发送下一条。

    异步:一般是使用send()方法里面的callback回掉函数,Kafka在返回相应的时调用该函数来实现异步的发送确认。

    KafkaProducer是线程安全的,可以在多个线程共享单个KafkaProducer实例,也可以将KafkaProcuder进行池化来供其他线程使用。

1.2、序列化

        生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。

1.3、分区器

        消息在通过send()发送到broker的过程中,又可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Paratitioner)的一系列作用之后才会被真正地发往broker。

        拦截器一般不是必需的,而序列化器是必须的。消息经过序列化之后,就要通过确定它发送的分区。如果消息ProducerRecord的partition字段没有指定,那么就需要依赖分区器。

        如果key不为null,那么默认的分区器会对key进行哈希,最终得到的哈希值来计算分区号,拥有相同key的消息会被写入同一分区。如果key为null,那么消息会以轮询的方式发往一个可用的分区。

1.4、拦截器

        拦截器(Interceptor)分为生产者拦截器和消费者拦截器。

        生产者拦截器可以在发送之前做一些准备工作,比如按照某个规则过滤掉不符合要求的消息,修改消息的内容等。


2、原理分析

        整个生产者客户端由两个线程协调运行,主线程和Sender线程。

        主线程中,KafkaProducer创建消息,然后通过拦截器、序列化器和分区器之后,将消息累加到消息积累器(RecordAccumulator,也称消息收集器)。Sende线程负责从RecordAccumulator中获取消息,并将其发送到Kafka中。

        RecordAccumulator作用:缓存消息,以便sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator的大小是通过buffer.memory来设置,默认32M。

        主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator中,为每一个分区都维护了一个双端队列,队列中的内容就是Producerbatch。

        Producerbatch不是ProducerRecord,Producerbatch中包含一到多个ProducerRecord,Producerbatch其实是一个消息的批次。将较小的ProducerRecord拼凑成一个较大的Producerbatch,也可以减少网络请求的次数来提供整体的吞吐量(这也是Kafka吞吐量高的原因之一)。

        ProducerBatch的大小和batch.size的参数有这密切的关系。当一条消息发送到RecordAccumulator中的时候,会先寻找与消息分区对应的双端队列中,再从双端队列的尾部获取一个ProducerBatch,如果没有就新建。查看这个ProducerBatch是否还可以写入这个ProducerRecord,如果可以写入就写入,如果不行就新建ProducerBatch。

        在创建ProducerBatch的时候,会评估这条消息是否超过了batch.size,如果没有超过,那么就以batch.size参数的大小来创建ProducerBatch。如果超过了,就会以评估的消息大小来创建ProducerBatch。

        Sender从RecordAccumulator获取消息后,在将请求发送给Kafka之前还会将请求保存到InFlightRequest中,他的主要作用是缓存了已经发送出去但还没有得到响应的请求。


3、重要参数

3.1、acks

用来指定分区中必需要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的,这个参数涉及到吞吐量和可靠性之间的权衡。(字符串类型)。

        1,默认值。代表leader成功写入即可,就会收到来自服务端的成功响应。。这种方案。有可能会造成发送消息丢失。比如在同步给其他follower的过程之前,leader突然崩溃。

        0,不需要等待任何服务端的响应。最大的吞吐量。

        -1或者all。等待ISR中所有副本都成功写入,最强的可靠性。



****问题****:如果leader crash时,ISR为空怎么办?

kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:

true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容