kafka消息累加器作用

程序中调用kafka生产者发送消息,并不是每调用一次send方法,就直接将消息通过底层网络发送给broker了,而是会将多个消息形成一个批次,然后再以批次的形式,发送给broker,当然了,消息的发送也不是由生产者线程发送的。那么,kafka的消息如何形成一个批次,以及批次的形式,这个就是消息累加器的作用。

下面从源码的角度来看下消息累加器是如何处理消息的,并且还会和分区器一起搭配使用,下面这个方法是doSend方法的实现逻辑,这里只截取和累加器相关的代码部分

//前面代码省略
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, true);

if (result.abortForNewBatch) {
    int prevPartition = partition;
    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    if (log.isTraceEnabled()) {
        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
    }
    // producer callback will make sure to call both 'callback' and interceptor callback
    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

    result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, false);
}

if (transactionManager != null && transactionManager.isTransactional())
    transactionManager.maybeAddPartitionToTransaction(tp);

if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}
return result.future;

在对消息的key、value进行序列化后,并且根据分区器选择好分区之后,会调用累加器的append方法,因此,重点关注下append方法的实现逻辑

/**
@param abortOnNewBatch,这个参数的作用是,是否放弃使用新的批次,每个分区都会对应一个双向队列,
每个队列的元素是一个批次,当有新消息时,会取出队列的最后一个元素,并将消息累加到该批次中,假如批次的容量达到上限了,那么新消息默认需要生成一个新的批次,
再重新添加到双向队列中,如果参数为true,表示在这种情况下,放弃使用新的批次
*/
public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch) throws InterruptedException {
    //每调用一次append方法,都会被记录下来
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // 取出分区对应的双向队列,若没有,则生成一个新的队列,并放入到map中
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
           //试图将消息添加到队列的最后一个批次元素中,若添加成功,那么方法直接返回
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }

        //当添加失败时,若参数指定为true,那么方法会直接返回,不会创建新的批次。
//外层方法第一次调用append方法时传的参数为true,
//主要是因为,kafka的发送者线程(区别于生产者线程)以一个批次为发送基本单位,因此为了让消息尽量多的累加到一个批次,
//当第一次无法往分区队列的最后一个批次累加时,优先选择另一个分区的队列批次。
        if (abortOnNewBatch) {
            // Return a result that will cause another call to append.
            return new RecordAppendResult(null, false, false, true);
        }
        //计算此次消息需要的内存大小
        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // 再次检查生产者线程是否关闭了
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            //这边为何又要重新尝试append,因此当有多个线程并发的往同一分区累加消息,
//可能另一个线程已经生成好一个新的批次对象,并加入到双向队列中了,
//因而这边需要再次尝试append数据,而不是直接生成新的批次对象
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
            //若尝试append失败之后,这里才开始真正的构建新的批次对象,并加入到双向队列之中
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                    callback, time.milliseconds()));

            dq.addLast(batch);
//每个批次还未添加到一个未完成的集合中,因此这些批次还未发送和得到broker的确认
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

append方法的返回对象RecordAppendResult包含以下几个

public final FutureRecordMetadata future;//消息记录的元数据信息
public final boolean batchIsFull;  //批次是否满了或者队列是否不为空
public final boolean newBatchCreated;//是否新创建的批次
public final boolean abortForNewBatch;//放弃使用新的批次,表示消息往分区append失败,需要重新append

其中abortForNewBatch决定doSend方法中是否再次调用append方法

if (result.abortForNewBatch) {
    int prevPartition = partition;
    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
    partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    if (log.isTraceEnabled()) {
        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
    }
    // producer callback will make sure to call both 'callback' and interceptor callback
    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

    result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs, false);
}

上述方法体中,会调用分区器的onNewBatch方法,设置一个新的分区对应的粘性分区,然后往新的分区append数据,这里为何要使用新的分区,原因在上述append方法实现中解释过了。

当批次是满的或者是新创建时,doSend方法会唤醒发送者线程。这里有个地方需要注意的是,kafka生产者线程和发送者线程是分开的,生产者线程负责往底层的队列中添加消息的批次对象,而发送者线程不断从队列中取出消息批次来发送给broker,实现了消息的构造和发送解耦。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,783评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,360评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,942评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,507评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,324评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,299评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,685评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,358评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,652评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,704评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,465评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,318评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,711评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,991评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,265评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,661评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,864评论 2 335

推荐阅读更多精彩内容