深入Kafka系列(一) Producer 开发

Kafka是目前业界最经典的消息引擎,之前在学习工作中只是对基本的原理和使用方法有一点了解。寒假稍微有一点空余时间,想对Kafka有一个更深入的了解。

本系列是对胡夕《Apache Kafka实战》一书的学习笔记,会跟随作者的安排,逐章对Kafka的设计细节做介绍。《Apache Kafka实战》这本书写的很棒,一看就是行家用心写出来的。如果想深入学习Kafka的同学可以考虑入手一本。

言归正传,基本的Kafka入门知识和概念这里就不赘述。欢迎参考我的同学沙利民关于Kafka的入门知识总结。《Kafka从入门到实践》

本系列直接从Producer开发开始。Kafka自0.9.0.0版本后,就启用了新设计的Java版的Producer取代旧的Scala版本的Producer,本文介绍的也是新版本。

1. Producer工作流程

Producer的工作流程可以简单概述为三步:

  • 封装消息:使用用户主线程将待发送的消息封装成ProducerRecord类实例,在完成序列化后发送给partitioner。

  • 完成分区:partitioner根据分区策略(后文介绍)对消息进行分区,将同一分区的消息发送给某一块内存缓存区。

  • 完成发送:使用另外一个I/O线程实时从缓存区域中提取消息封装成一个batch,统一发送给对应的broker。

Producer需要完成的任务就是对消息进行分区,以及确定分区的leader。

2. 构造Producer

2.1 代码实例

image

基本的Producer构造代码如图所示,基本步骤有:

  • 构造Properties对象,指定必要的参数包括bootstrap.servers,key.serializer,value.serializer
  • 构造KafkaProducer对象
  • 构造ProducerRecord对象,必须指定的参数有topic,value
  • 调用KafkaProducer的send,有两个方种发送:同步发送和异步发送+回调
  • 关闭KafkaProducer

2.2 详细解释

构造Properties对象:使用Properties指定参数,有

  • bootstrap.servers:一组broker列表,包含<host:port>対
  • key/value serializer:发送给broker的消息必须是字节数组,所以需要指定key和value的序列化格式,一般都是StringSerializer
  • acks:指定在producer发送响应前,leader broker必须确保已经成功写入该消息的副本数,分别为0,1,all。
    • 0表示producer发送消息后不等broker的返回结果,直接进入下一条的发送
    • 1表示只需要消息写进broker leader的日志就可以
    • -1/all 表示必须所有副本都完成消息持久化后,才发送结果给producer,开始下一条的发送
  • buffer.memory:用于缓存消息的缓冲区大小,默认32MB。刚才在工作流程中也讲过,producer会把发给同一分区的消息缓存在缓冲区内,等待I/O进程适时发送。这部分缓冲区的大小由此参数设置。
  • compression.type:消息是否压缩,默认是None。目前支持的压缩方式有三种,其中效果最好的是LZ4。压缩需要格外消耗CPU资源,仅在带宽资源不足,prodocer的CPU资源充足时考虑压缩。
  • retries:当消息发送失败时,producer自动重新发送消息的次数。
  • batch.size:最重要的调优参数之一。还是刚才所说的,producer将同一分区的消息缓存在缓冲区内,并封装成一个一个batch,当满了以后会由I/O进程发送。batch大小就很重要,太小的话一次发送请求发送的消息数太少;太大的话对内存压力又很大。默认的大小是16kb,适当的提高此参数可以提高吞吐量。
  • linger.ms:batch没满时,也可能被提前发送。linger.ms参数可以控制消息发送延迟行为,默认是0,即消息需要立即被发送,无需关心batch是否填满。这样设计大多数情况下是合理的,但是会拉低吞吐量。

还有其他的参数,自己看官网吧。

构造KafkaProducer对象:在Properties里完成参数设置后,就可以构造KafkaProducer对象了。

构造ProducerRecord对象:需要将topic和value信息包装在ProducerRecord对象中,key可选。

发送消息:Kafka发送消息有两种方式:

  • 异步发送:send方法会返还一个Java Future对象供用户获取发送结果。根据回调的参数实现异步发送以及对发送结果的响应。
  • 同步发送:调用send().get()方法可以实现同步发送的效果,即无限等待broker返还给producer的结果。

关闭producer:producer占用了大量系统资源,使用完后必须关闭。

3. 消息分区策略

3.1 默认的分区策略

当消息指定key时,使用murmur2算法计算哈希值,然后由哈希值对总分区数求模后找到目标分区号,此时完成分区操作,相同的key的所有消息分配到相同的分区。

当没有指定key是,partitioner根据轮询的方式确保所有分区均匀。

3.2 自定义分区实例

image

这个自定义分区的实例实现了,当key包含"audit"字符串时,该消息发送到最后一个分区,其他消息按照随机的策略发送到其他分区。

4. producer拦截器

4.1 拦截器

拦截器interceptor可以实现消息发送前、producer回调逻辑前对消息做一些定制化需求,比如修改消息等。interceptor通过接口ProducerInterceptor实现,主要有两个方法:

  • onSend:运行在用户主线程中,在消息被序列化以计算分区前调用,用户可以对消息做任何修改。
  • onAcknowledgement:运行在I/O线程中,在消息被应答前或消息发送失败时调用,通常发生在producer回调逻辑触发之前。

4.2 实例

4.1.2 onSend实例

image

上图是自定义了一个拦截器,在消息发送之前对消息做了修改,在value值添加了时间戳。然后需要在Properties参数配置中按下图的方式添加。

image

结果:

image

4.1.2 onAcknowledgement实例

image

该拦截器实现了消息发送后更新”发送成功消息数“和“发送失败消息数”。添加方式一样,此时可以构成双interceptor的拦截链。

image

结果:

image

在《Apache Kafka实战》中,还介绍了消息序列化、无消息丢失配置、消息压缩等关于Kafka Producer的介绍,需要的同学可以参阅。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容