聊聊spring for kafka对producer的封装与集成

本文主要解析一下spring for apache kafka对原生的kafka client producer的封装与集成。

producer工厂

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
    @Override
    public void destroy() throws Exception { //NOSONAR
        CloseSafeProducer<K, V> producer = this.producer;
        this.producer = null;
        if (producer != null) {
            producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
        }
    }


    @Override
    public void start() {
        this.running = true;
    }


    @Override
    public void stop() {
        try {
            destroy();
        }
        catch (Exception e) {
            logger.error("Exception while stopping producer", e);
        }
    }


    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public Producer<K, V> createProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
                }
            }
        }
        return this.producer;
    }
}

集成spring的第一步就是集成到spring容器托管,然后跟随spring容器的生命周期正常启动和销毁。这里创建了CloseSafeProducer,它实际的操作都委托给kafka producer

KafkaTemplate

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/KafkaTemplate.java
实现了如下接口

public interface KafkaOperations<K, V> {

    /**
     * Send the data to the default topic with no key or partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(V data);

    /**
     * Send the data to the default topic with the provided key and no partition.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

    /**
     * Send the data to the default topic with the provided key and partition.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);

    /**
     * Send the data to the provided topic with no key or partition.
     * @param topic the topic.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, V data);

    /**
     * Send the data to the provided topic with the provided key and no partition.
     * @param topic the topic.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

    /**
     * Send the data to the provided topic with the provided partition and no key.
     * @param topic the topic.
     * @param partition the partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);

    /**
     * Send the data to the provided topic with the provided key and partition.
     * @param topic the topic.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);

    /**
     * Send a message with routing information in message headers. The message payload
     * may be converted before sending.
     * @param message the message to send.
     * @return a Future for the {@link SendResult}.
     * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
     * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
     * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
     */
    ListenableFuture<SendResult<K, V>> send(Message<?> message);

    /**
     * See {@link Producer#partitionsFor(String)}.
     * @param topic the topic.
     * @return the partition info.
     * @since 1.1
     */
    List<PartitionInfo> partitionsFor(String topic);

    /**
     * See {@link Producer#metrics()}.
     * @return the metrics.
     * @since 1.1
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Execute some arbitrary operation(s) on the producer and return the result.
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    <T> T execute(ProducerCallback<K, V, T> callback);

    /**
     * Flush the producer.
     */
    void flush();

    /**
     * A callback for executing arbitrary operations on the {@link Producer}.
     * @param <K> the key type.
     * @param <V> the value type.
     * @param <T> the return type.
     * @since 1.1
     */
    interface ProducerCallback<K, V, T> {

        T doInKafka(Producer<K, V> producer);

    }

}

主要的send方法如下,这就是spring对producer的主要包装的地方:

/**
     * Send the producer record.
     * @param producerRecord the producer record.
     * @return a Future for the {@link RecordMetadata}.
     */
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final Producer<K, V> producer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        producer.send(producerRecord, new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult<>(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null
                                && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
                                    producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    }
                    else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
                                    producerRecord.partition(),
                                    producerRecord.key(),
                                    producerRecord.value(),
                                    exception);
                        }
                    }
                }
                finally {
                    producer.close();
                }
            }

        });
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return future;
    }

不要被CloseSafeProducer的close方法误导,它里头是个空方法。

  • spring对send方法包装了一层之后,返回SettableListenableFuture,里头是个SendResult
  • 然后对异常也进行了一次包装,包装为spring定义的KafkaException
  • 支持了listener,同步调用
  • 内置MessagingMessageConverter
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容