Kafka读书笔记:生产者(Producer)

生产者

客户端开发

正常的生产逻辑需要具备以下几个步骤:

  1. 构建生产者客户端参数及创建相应的生产者实例
  2. 构建待发送的消息
  3. 发送消息
  4. 关闭生产者实例

消息的发送

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

发后即忘:

​ 只管往Kafka发送消息,而不关心消息是否正确到达。在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式性能最高(因为不用做检查处理,牺牲了消息的安全性,达到高吞吐的目的),可靠性最差。

同步:

​ 利用返回Future对象实现如下代码所示,失败时,捕捉异常,并做响应的异常处理。也可以通过注释的代码部分获取RecordMetadata对象,包含当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

异常
  • 可重试异常:

​ 常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复

​ 配置retries 参数设置可重试异常的重试次数。如果重试超过设置次数之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

  • 不可重试异常

​ RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。

private static void sendData() {
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


    KafkaProducer producer = new KafkaProducer<String, String>(properties);


    ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry",
            "Precision Producer", "France2");
    try {
        producer.send(record).get();
       // Future<RecordMetadata> future = producer.send(record);
       // RecordMetadata metaData = future.get();
    } catch (Exception e) {
        System.out.println("send Fail:" + e.getMessage());
    }

    producer.close();
}
异步发送
  1. 一般是在send()方法中指定一个Callback的回调函数,Kafka在返回响应式调用改函数来实现异步的发送请求。
  2. onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        // recordMetadata和e必有一个为null
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(recordMetadata);
        }
    }
});
close()方法

close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。下面这个带超时时间参数的close()方法,只会等待timeout的时间,如果在timeout时间后仍未完成所有的请求处理,会强行退出,在实际应用中,一般使用无参close()方法。

public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)

序列化

​ 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。而在接收方,消费者需要使用反序列化器把Kafka中收到的字节数组转换成对应的对象。

生产者拦截器

​ KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)

public interface ProducerInterceptor<K, V> extends Configurable {
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    public void close();
}

在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

onSend()

调用时间点:将消息序列化和计算分区之前

​ KafkaProducer会在将消息序列化和计算分区之前调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。不建议在这里修改ProducerRecord的topic、key和partition信息(可能会对问题的排查和定位造成影响,以及出现难以预料的bug),比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩的功能。

onAcknowledgement()

调用时间点:在消息被应答(acknowledgement)之前或消息发送失败时,优先于用户设定的CallBack方法。

​ KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close()

​ close()方法主要用于在关闭拦截器时执行一些资源的清理工作。

生产者客户端整体架构

生产者客户端的整体架构

分区

如果key为null,会被随机分配到主题内各个可用的分区。如果key不为null,会对key进行计算决定分到哪个分区。

优点

​ 使用Kafka自己的hash算法对key进行运算,并不会因为java版本的升级导致分区结果不同。

缺点

​ 由于使用了hash算法,对横向扩展不友好。一旦主题增加了分区的个数,可能会造成旧的数据还在老的分区,新的数据被分配到了新的分区。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好。

自定义分区策略

​ 如果出现某个key的数据量特别大,导致按默认key分区后,对应的分区数据量明显过大,从而导致存储和性能上的问题。这时候就需要使用自定义分区策略实现(通过实现Partitioner)。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

import java.io.InvalidClassException;
import java.util.Map;

/**
 * @author by yze on 2020/11/8
 * @since 202011
 */
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes == null || !(key instanceof String)) {
            throw new InvalidRecordException("不支持的数据类型");
        }
        if (((String) key).equals("special")) {
            // 专门定制到一个分区
            return numPartitions;
        }
        // 其它的key会计算后被分配到除了最后一个分区以外的分区
        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342