一、原理
- Producer首先调用send方法进行发送
- 会经过拦截器(一般情况不使用),可以对数据进行一些加工处理
- 随后会经过序列化
kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,因为Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。
- 随后经过分区器。
分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池(当通过分区器创建数据后,申请内存,发送到集群后再释放),避免频繁的申请和释放内存。
因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小16K
- sender线程帮助我们将缓冲队列中的数据,发送到kafka集群中。
batch.size:只有数据累积到batch.size之后,sender才会发送数据。默认16K
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送信息。单位ms,默认值是0ms(即默认到了就发送,不等待到达batch.size阈值)
生产环境中上面两个参数都需要调整
-
发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到
五个
请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。
kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1
0:生产者发送过来的数据,不需要等待数据落盘应答
1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓
-1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。
(1) Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2)
(2) 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如如果2超时,(leader:0,ISR:0,1)
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
- kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值
生产者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地址清单 。 例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以 设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory RecordAccumulator | 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没 有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 |
支持压缩类型:none、gzip、snappy、lz4 和 zstd。
二、生产者分区
(1)分区好处
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
(2)生产者发送消息的分区策略
- 默认的分区器 DefaultPartitioner
<?php
#producer.php 文件
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");//指定分区[RD_KAFKA_PARTITION_UA代表系统分配]和发送数据,第四个参数是key
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
- 数据可以指定分区 partition
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
$topic->produce(4, 0, "Message $i");//向4号分区发送
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i",'c');//制定分区key:a:2,b:1,c:0,d:1,f:1
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
- 什么都没指定,粘性分区:随机
- 如果有特殊规则,可自定义分区规则
(3)生产者如何提高吞吐量
高吞吐的原因:
- 顺序读写:消息是不断追加到文件中的。
- 零拷贝:减少不必要的拷贝次数,存储数据通过mmap持久化到磁盘,发送数据采用sendfile将磁盘数据读到OS内核缓冲区后直接转到socket buffer进行网络发送。
- 文件分段:Topic分为了多个Partition,而每个Partition中的数据又被分为了多个段segment file。这样我们在查找指定offset消息时其实就是在一个小文件进行操作,提高查询效率。
- 批量发送:Kafka允许批量发送消息,先将消息缓存到内存中,然后一次发送出去,这样做可以大大的减少服务端I/O次数。
- 数据压缩:减少传输的数据量。
- batch.size:批次大小,默认16k,建议32k
- linger.ms:等待时间,数据越大延迟越大,修改为5-100ms
- compression.type:压缩snappy
- RecordAccumulator:缓冲区大小,修改为64m
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$conf->set('batch.size', 32768);#批次大小设置成32K
$conf->set('linger.ms', 100);#发送批次前的等待时间设置成100ms,可以5-100ms
$conf->set('compression.type', 'snappy');#数据压缩采用snappy
#$conf->set('memory', 67108864);#缓冲区设置成64m---设置失败
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");//制定分区key:a:2,b:1,c:0,d:1,f:1
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
(4)数据可靠性
0)回顾发送流程
1)ack 应答原理
思考:Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set(ISR),意为和 Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参 数设定,默认30s。例如2超时,(leader:0, isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点。
数据可靠性分析:
如果分区副本设置为1个,或者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一 样的,仍然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
数据重复分析:
acks= -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。
(5)数据去重
1.数据传递语义
至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
最多一次(At Most Once)= ACK级别设置为0
总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
2 幂等性
1)幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的序列号。
所以幂等性只能保证的是在单分区单会话内不重复。
3 生产者事务
单个 Producer,使用事务保证消息的仅一次发送
二、Kafka事务原理
- 开启事务,必须开启幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了数据不重复
重复数据的评判标准:具有<PID,Partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务
使用幂等性:开启参数enable.idempotence默认为true,即默认为开启
事务并不是直接存储在磁盘中,而是存储在一个特殊的topic的分区中。
<?php
class kafkaClass
{
/**
* producer 生产者
*
* @param string $host
* @param string $topicName
* @param array $messages
* @param array $ext
*
*
* idempotence_enable => true 开启事务性 Producer。
* transactional_id => 事务ID,当 transactional = true 必须。
* partition_index => 指定写入分片。
* message_key => 消息键保序策略 key
* @throws \Exception
*/
public static function producer(string $host, string $topicName, array $messages, array $ext = [])
{
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', $host);// $host 可配置多个 '192.168.0.1:9092,192.168.0.2:9092'
// 幂等性 Producer
// enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer。Kafka 自动做消息的重复去重,只能实现单会话上的幂等性,不能实现跨会话的幂等性。
if (! empty($ext['idempotence_enable'])) {
$conf->set('enable.idempotence', true);
}
// 1. 事务型 Producer
// 开启 idempotence 同时设置了 transactional.id,Producer 则为事务型。实现多会话幂等性。
if (! empty($ext['idempotence_enable'])) {
if (empty($ext['transactional_id'])) {
throw new \Exception("事务型 Producer 需要指定一个 transaction id.");
}
$conf->set('enable.idempotence', true);
$conf->set('transactional.id', $ext['transactional_id']);
}
$producer = new \RdKafka\Producer($conf);
$topic = $producer->newTopic($topicName);
// 开启事务
if (! empty($ext['idempotence_enable'])) {
$producer->initTransactions(10000);
$producer->beginTransaction();
}
// RD_KAFKA_PARTITION_UA 代表 librdkafka 选择分区。
$partition = $ext['partition_index'] ?? RD_KAFKA_PARTITION_UA;
// 1. Round-robin - 轮询策略(默认),保证消息最大限度地被平均分配到所有分区上。
// 2. Randomness - 随机策略,消息放置到任意一个分区上。
// 3. key-ordering - 消息键保序策略,同属一个 key 的消息会被写到相同的分区里。
// 不指定 message_key 则默认使用 Round-robin,指定则使用 key-ordering。
$messageKey = $ext['message_key'] ?? null;
foreach ($messages as $message) {
if (is_array($message)) {
$message = json_encode($message);
}
$topic->produce($partition, 0, $message, $messageKey);
// 该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据;
$producer->poll(0);
}
if (empty($ext['idempotence_enable'])) {
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
// 不调用flush会导致消息丢失
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('无法刷新,消息可能会丢失.');
}
}
// 提交事务
if (! empty($ext['idempotence_enable'])) {
$error = $producer->commitTransaction(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR != $error) {
$producer->abortTransaction();
throw new \Exception("本次事务失败,已回滚.");
}
}
}
}
$ext = [
'idempotence_enable'=>true, #设置幂等性 Producer
'transactional_id'=>'trans_id',#指定事务id
];
$mess = ['test-1','test-2','test-3'];
kafkaClass::producer('192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092','foo',$mess,$ext);
三、数据乱序
生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序
将
max.in.flight.requests-per.connection
设置为1,即不缓存request请求,自然不会发生数据乱序的情况。
开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。