kafka-第二章-生产者

学习大纲


学习大纲

一、kafka java客户端数据生产流程解析

java客户端数据生产流程

一、发送类型

1、同步发送
  • 通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
  • 如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
  • 如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
    producer.send(record).get();
2、异步发送

异步发送通过callback来监听回调结果

//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
      @Override
      public void onFailure(Throwable throwable) {
          //发送失败的处理
          log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
      }
      @Override
      public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
           //成功的处理
          log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
     }
});

二、序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。
Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型
( IntegerSerializer)和字节数组(BytesSerializer)序列化器等等,这些序列化器都实现了接口
( org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。


序列化器

二、分区器

  • 本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略
  • Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果Key相同的话,那么就会分配到统一分区。
    源代码org.apache.kafka.clients.producer.internals.DefaultPartitioner分析
public class DefaultPartitioner implements Partitioner {
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //判断当前消息是否为null
        if (keyBytes == null) {
            return this.stickyPartitionCache.partition(topic, cluster);
        } else {
            //通过cluster集群和topic主机获取分区列表partitions
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            //获取分区的大小
            int numPartitions = partitions.size();
           //通过获取当前消息与分区大小进行取模来得到分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {
    }

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

org.apache.kafka.common.utils

public static int toPositive(int number) {
        return number & 2147483647;
}

public static int murmur2(byte[] data) {
        int length = data.length;
        int seed = -1756908916;
        int m = 1540483477;
        int r = true;
        int h = seed ^ length;
        int length4 = length / 4;

        for(int i = 0; i < length4; ++i) {
            int i4 = i * 4;
            int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
            k *= 1540483477;
            k ^= k >>> 24;
            k *= 1540483477;
            h *= 1540483477;
            h ^= k;
        }

        switch(length % 4) {
        case 3:
            h ^= (data[(length & -4) + 2] & 255) << 16;
        case 2:
            h ^= (data[(length & -4) + 1] & 255) << 8;
        case 1:
            h ^= data[length & -4] & 255;
            h *= 1540483477;
        default:
            h ^= h >>> 13;
            h *= 1540483477;
            h ^= h >>> 15;
            return h;
        }
}

三、拦截器

Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。生产者拦截器可以用在消息发送前做一些准备工作。若要写自定义拦截器则需实现org.apache.kafka.clients.producer.internals.ProducerInterceptors
使用场景

  • 1、按照某个规则过滤掉不符合要求的消息
  • 2、修改消息的内容
  • 3、统计类需求
自定义拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    public ProducerInterceptorPrefix(List<ProducerInterceptor<String, String>> producerInterceptors) {
        super(producerInterceptors);
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //该方法封装进kafkaProducer.send方法中,即它运行在用户主线程中。producer确保消息被序列化以及计算分区前调用该方法。我们可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
        //拦截数据,给数据加上默认前缀
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.value(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //该方法会从RecordAccumulator成功发送到kafka broker之后,或者在发送过程中失败时调用。并且通常是在producer回调逻辑触发之前。该方法运行在producer的IO线程中,因此不要在该方法中放入重要的逻辑,否则会拖慢producer的消息发送效率。
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
         //关闭interceptor,清理一些资源。
        //输出统计数目
        System.out.println("当前发送成功总计: " + sendSuccess + " 条,发送失败总计: " + sendFailure + " 条");
    }

   @Override
    public void configure(Map<String, ?> map) {

    }
}
添加拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.haijia.kafka.kafka.ProducerInterceptorPrefix");
interceptors.add("com.haijia.kafka.kafka.ProducerInterceptorPrefix2");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

总结:
interceptor可能运行在多个线程中,因此在具体的实现时用户需要自行确保线程安全。另外,若指定了多个interceptor,则producer将按照顺序调用他们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非再向上传递。

四、发送原理剖析

发送原理图

消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象。具体可查看org.apache.kafka.clients.producer.KafkaProducer源码

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions先经过拦截器处理
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
   ...

五、部分参数介绍

retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过retry.backoff.ms 参数来修改这个时间间隔。

batch.size

当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把batch.size设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。

max.request.size

该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。broker对可接收的消息最大值也有自己的限制(message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被broker拒绝。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345