Kafka是目前业界最经典的消息引擎,之前在学习工作中只是对基本的原理和使用方法有一点了解。寒假稍微有一点空余时间,想对Kafka有一个更深入的了解。
本系列是对胡夕《Apache Kafka实战》一书的学习笔记,会跟随作者的安排,逐章对Kafka的设计细节做介绍。《Apache Kafka实战》这本书写的很棒,一看就是行家用心写出来的。如果想深入学习Kafka的同学可以考虑入手一本。
言归正传,基本的Kafka入门知识和概念这里就不赘述。欢迎参考我的同学沙利民关于Kafka的入门知识总结。《Kafka从入门到实践》
本系列直接从Producer开发开始。Kafka自0.9.0.0版本后,就启用了新设计的Java版的Producer取代旧的Scala版本的Producer,本文介绍的也是新版本。
1. Producer工作流程
Producer的工作流程可以简单概述为三步:
封装消息:使用用户主线程将待发送的消息封装成ProducerRecord类实例,在完成序列化后发送给partitioner。
完成分区:partitioner根据分区策略(后文介绍)对消息进行分区,将同一分区的消息发送给某一块内存缓存区。
完成发送:使用另外一个I/O线程实时从缓存区域中提取消息封装成一个batch,统一发送给对应的broker。
Producer需要完成的任务就是对消息进行分区,以及确定分区的leader。
2. 构造Producer
2.1 代码实例
基本的Producer构造代码如图所示,基本步骤有:
- 构造Properties对象,指定必要的参数包括bootstrap.servers,key.serializer,value.serializer
- 构造KafkaProducer对象
- 构造ProducerRecord对象,必须指定的参数有topic,value
- 调用KafkaProducer的send,有两个方种发送:同步发送和异步发送+回调
- 关闭KafkaProducer
2.2 详细解释
构造Properties对象:使用Properties指定参数,有
- bootstrap.servers:一组broker列表,包含<host:port>対
- key/value serializer:发送给broker的消息必须是字节数组,所以需要指定key和value的序列化格式,一般都是StringSerializer
- acks:指定在producer发送响应前,leader broker必须确保已经成功写入该消息的副本数,分别为0,1,all。
- 0表示producer发送消息后不等broker的返回结果,直接进入下一条的发送
- 1表示只需要消息写进broker leader的日志就可以
- -1/all 表示必须所有副本都完成消息持久化后,才发送结果给producer,开始下一条的发送
- buffer.memory:用于缓存消息的缓冲区大小,默认32MB。刚才在工作流程中也讲过,producer会把发给同一分区的消息缓存在缓冲区内,等待I/O进程适时发送。这部分缓冲区的大小由此参数设置。
- compression.type:消息是否压缩,默认是None。目前支持的压缩方式有三种,其中效果最好的是LZ4。压缩需要格外消耗CPU资源,仅在带宽资源不足,prodocer的CPU资源充足时考虑压缩。
- retries:当消息发送失败时,producer自动重新发送消息的次数。
- batch.size:最重要的调优参数之一。还是刚才所说的,producer将同一分区的消息缓存在缓冲区内,并封装成一个一个batch,当满了以后会由I/O进程发送。batch大小就很重要,太小的话一次发送请求发送的消息数太少;太大的话对内存压力又很大。默认的大小是16kb,适当的提高此参数可以提高吞吐量。
- linger.ms:batch没满时,也可能被提前发送。linger.ms参数可以控制消息发送延迟行为,默认是0,即消息需要立即被发送,无需关心batch是否填满。这样设计大多数情况下是合理的,但是会拉低吞吐量。
还有其他的参数,自己看官网吧。
构造KafkaProducer对象:在Properties里完成参数设置后,就可以构造KafkaProducer对象了。
构造ProducerRecord对象:需要将topic和value信息包装在ProducerRecord对象中,key可选。
发送消息:Kafka发送消息有两种方式:
- 异步发送:send方法会返还一个Java Future对象供用户获取发送结果。根据回调的参数实现异步发送以及对发送结果的响应。
- 同步发送:调用send().get()方法可以实现同步发送的效果,即无限等待broker返还给producer的结果。
关闭producer:producer占用了大量系统资源,使用完后必须关闭。
3. 消息分区策略
3.1 默认的分区策略
当消息指定key时,使用murmur2算法计算哈希值,然后由哈希值对总分区数求模后找到目标分区号,此时完成分区操作,相同的key的所有消息分配到相同的分区。
当没有指定key是,partitioner根据轮询的方式确保所有分区均匀。
3.2 自定义分区实例
这个自定义分区的实例实现了,当key包含"audit"字符串时,该消息发送到最后一个分区,其他消息按照随机的策略发送到其他分区。
4. producer拦截器
4.1 拦截器
拦截器interceptor可以实现消息发送前、producer回调逻辑前对消息做一些定制化需求,比如修改消息等。interceptor通过接口ProducerInterceptor实现,主要有两个方法:
- onSend:运行在用户主线程中,在消息被序列化以计算分区前调用,用户可以对消息做任何修改。
- onAcknowledgement:运行在I/O线程中,在消息被应答前或消息发送失败时调用,通常发生在producer回调逻辑触发之前。
4.2 实例
4.1.2 onSend实例
上图是自定义了一个拦截器,在消息发送之前对消息做了修改,在value值添加了时间戳。然后需要在Properties参数配置中按下图的方式添加。
结果:
4.1.2 onAcknowledgement实例
该拦截器实现了消息发送后更新”发送成功消息数“和“发送失败消息数”。添加方式一样,此时可以构成双interceptor的拦截链。
结果:
在《Apache Kafka实战》中,还介绍了消息序列化、无消息丢失配置、消息压缩等关于Kafka Producer的介绍,需要的同学可以参阅。