3.kafka生产者

一、原理

  • Producer首先调用send方法进行发送
  • 会经过拦截器(一般情况不使用),可以对数据进行一些加工处理
  • 随后会经过序列化

kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,因为Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。

  • 随后经过分区器。

分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池(当通过分区器创建数据后,申请内存,发送到集群后再释放),避免频繁的申请和释放内存。
因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小16K

  • sender线程帮助我们将缓冲队列中的数据,发送到kafka集群中。

batch.size:只有数据累积到batch.size之后,sender才会发送数据。默认16K
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送信息。单位ms,默认值是0ms(即默认到了就发送,不等待到达batch.size阈值)
生产环境中上面两个参数都需要调整

  • 发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到五个请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。

  • kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1

0:生产者发送过来的数据,不需要等待数据落盘应答
1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓
-1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。
(1) Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2)
(2) 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如如果2超时,(leader:0,ISR:0,1)
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景

  • kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值

生产者重要参数

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地址清单 。 例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以 设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没 有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。

支持压缩类型:none、gzip、snappy、lz4 和 zstd。

二、生产者分区

(1)分区好处

便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。


(2)生产者发送消息的分区策略

  • 默认的分区器 DefaultPartitioner
<?php
#producer.php 文件
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");//指定分区[RD_KAFKA_PARTITION_UA代表系统分配]和发送数据,第四个参数是key
    $producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
  • 数据可以指定分区 partition
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
    $topic->produce(4, 0, "Message $i");//向4号分区发送
    $producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
  • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i",'c');//制定分区key:a:2,b:1,c:0,d:1,f:1
    $producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
  • 什么都没指定,粘性分区:随机
  • 如果有特殊规则,可自定义分区规则

(3)生产者如何提高吞吐量

高吞吐的原因:

  • 顺序读写:消息是不断追加到文件中的。
  • 零拷贝:减少不必要的拷贝次数,存储数据通过mmap持久化到磁盘,发送数据采用sendfile将磁盘数据读到OS内核缓冲区后直接转到socket buffer进行网络发送。
  • 文件分段:Topic分为了多个Partition,而每个Partition中的数据又被分为了多个段segment file。这样我们在查找指定offset消息时其实就是在一个小文件进行操作,提高查询效率。
  • 批量发送:Kafka允许批量发送消息,先将消息缓存到内存中,然后一次发送出去,这样做可以大大的减少服务端I/O次数。
  • 数据压缩:减少传输的数据量。
  • batch.size:批次大小,默认16k,建议32k
  • linger.ms:等待时间,数据越大延迟越大,修改为5-100ms
  • compression.type:压缩snappy
  • RecordAccumulator:缓冲区大小,修改为64m
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092');
$conf->set('batch.size', 32768);#批次大小设置成32K
$conf->set('linger.ms', 100);#发送批次前的等待时间设置成100ms,可以5-100ms
$conf->set('compression.type', 'snappy');#数据压缩采用snappy
#$conf->set('memory', 67108864);#缓冲区设置成64m---设置失败
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("foo");
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");//制定分区key:a:2,b:1,c:0,d:1,f:1
    $producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}

(4)数据可靠性

0)回顾发送流程
1)ack 应答原理



思考:Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set(ISR),意为和 Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参 数设定,默认30s。例如2超时,(leader:0, isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点。

数据可靠性分析:
如果分区副本设置为1个,或者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一 样的,仍然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
数据重复分析:
acks= -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

(5)数据去重

1.数据传递语义
至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
最多一次(At Most Once)= ACK级别设置为0

总结:
At Least Once可以保证数据不丢失,但是不能保证数据不重复;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
2 幂等性
1)幂等性原理
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的序列号。
所以幂等性只能保证的是在单分区单会话内不重复。



3 生产者事务
单个 Producer,使用事务保证消息的仅一次发送

二、Kafka事务原理

  • 开启事务,必须开启幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了数据不重复
重复数据的评判标准:具有<PID,Partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务
使用幂等性:开启参数enable.idempotence默认为true,即默认为开启

事务并不是直接存储在磁盘中,而是存储在一个特殊的topic的分区中。


<?php
class kafkaClass
{
    /**
     * producer 生产者
     *
     * @param string $host
     * @param string $topicName
     * @param array  $messages
     * @param array  $ext
     *
     *
     *      idempotence_enable  => true 开启事务性 Producer。
     *      transactional_id => 事务ID,当 transactional = true 必须。
     *      partition_index  => 指定写入分片。
     *      message_key => 消息键保序策略 key
     * @throws \Exception
     */
    public static function producer(string $host, string $topicName, array $messages, array $ext = [])
    {
        $conf = new \RdKafka\Conf();
        $conf->set('metadata.broker.list', $host);// $host 可配置多个 '192.168.0.1:9092,192.168.0.2:9092'
        // 幂等性 Producer
        // enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer。Kafka 自动做消息的重复去重,只能实现单会话上的幂等性,不能实现跨会话的幂等性。
        if (! empty($ext['idempotence_enable'])) {
            $conf->set('enable.idempotence', true);
        }
        // 1. 事务型 Producer
        // 开启 idempotence 同时设置了 transactional.id,Producer 则为事务型。实现多会话幂等性。
        if (! empty($ext['idempotence_enable'])) {
            if (empty($ext['transactional_id'])) {
                throw new \Exception("事务型 Producer 需要指定一个 transaction id.");
            }
            $conf->set('enable.idempotence', true);
            $conf->set('transactional.id', $ext['transactional_id']);
        }

        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topicName);
        // 开启事务
        if (! empty($ext['idempotence_enable'])) {
            $producer->initTransactions(10000);
            $producer->beginTransaction();
        }
        // RD_KAFKA_PARTITION_UA 代表 librdkafka 选择分区。
        $partition = $ext['partition_index'] ?? RD_KAFKA_PARTITION_UA;
        // 1. Round-robin - 轮询策略(默认),保证消息最大限度地被平均分配到所有分区上。
        // 2. Randomness - 随机策略,消息放置到任意一个分区上。
        // 3. key-ordering - 消息键保序策略,同属一个 key 的消息会被写到相同的分区里。
        // 不指定 message_key 则默认使用 Round-robin,指定则使用 key-ordering。
        $messageKey = $ext['message_key'] ?? null;
        foreach ($messages as $message) {
            if (is_array($message)) {
                $message = json_encode($message);
            }
            $topic->produce($partition, 0, $message, $messageKey);
            // 该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据;
            $producer->poll(0);
        }
        if (empty($ext['idempotence_enable'])) {
            for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
                // 不调用flush会导致消息丢失
                $result = $producer->flush(10000);
                if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                    break;
                }
            }
            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new \RuntimeException('无法刷新,消息可能会丢失.');
            }
        }
        // 提交事务
        if (! empty($ext['idempotence_enable'])) {
            $error = $producer->commitTransaction(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR != $error) {
                $producer->abortTransaction();
                throw new \Exception("本次事务失败,已回滚.");
            }
        }
    }
}

$ext = [
    'idempotence_enable'=>true, #设置幂等性 Producer
    'transactional_id'=>'trans_id',#指定事务id
];
$mess = ['test-1','test-2','test-3'];
kafkaClass::producer('192.168.1.21:19092,192.168.1.21:29092,192.168.1.21:39092','foo',$mess,$ext);

三、数据乱序

生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序


max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。

开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容