spring for kafka自动配置及配置属性

本文主要列一下spring for apache kafka的一些auto config以及属性配置

maven

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.3.RELEASE</version>
</dependency>

这个版本使用的是kafka client 0.10.2.1版本
使用的spring retry是1.1.3.RELEASE版本

几个关键配置类

  • KafkaAutoConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(
            ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
                kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<Object, Object>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<Object, Object>(
                this.properties.buildProducerProperties());
    }

}
  • KafkaAnnotationDrivenConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    protected static class EnableKafkaConfiguration {

    }
}

配置属性

spring-boot-autoconfigure-1.5.7.RELEASE.jar!/META-INF/spring-configuration-metadata.json

公共配置

    {
      "name": "spring.kafka.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
    {
      "name": "spring.kafka.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    },
        {
      "name": "spring.kafka.ssl.key-password",
      "type": "java.lang.String",
      "description": "Password of the private key in the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.keystore-password",
      "type": "java.lang.String",
      "description": "Store password for the key store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-location",
      "type": "org.springframework.core.io.Resource",
      "description": "Location of the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.ssl.truststore-password",
      "type": "java.lang.String",
      "description": "Store password for the trust store file.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
    },
    {
      "name": "spring.kafka.template.default-topic",
      "type": "java.lang.String",
      "description": "Default topic to which messages will be sent.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Template"
    }

consumer配置属性

    {
      "name": "spring.kafka.consumer.auto-commit-interval",
      "type": "java.lang.Integer",
      "description": "Frequency in milliseconds that the consumer offsets are auto-committed to Kafka\n if 'enable.auto.commit' true.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.auto-offset-reset",
      "type": "java.lang.String",
      "description": "What to do when there is no initial offset in Kafka or if the current offset\n does not exist any more on the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.enable-auto-commit",
      "type": "java.lang.Boolean",
      "description": "If true the consumer's offset will be periodically committed in the background.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-max-wait",
      "type": "java.lang.Integer",
      "description": "Maximum amount of time in milliseconds the server will block before answering\n the fetch request if there isn't sufficient data to immediately satisfy the\n requirement given by \"fetch.min.bytes\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.fetch-min-size",
      "type": "java.lang.Integer",
      "description": "Minimum amount of data the server should return for a fetch request in bytes.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.group-id",
      "type": "java.lang.String",
      "description": "Unique string that identifies the consumer group this consumer belongs to.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.heartbeat-interval",
      "type": "java.lang.Integer",
      "description": "Expected time in milliseconds between heartbeats to the consumer coordinator.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.key-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.max-poll-records",
      "type": "java.lang.Integer",
      "description": "Maximum number of records returned in a single call to poll().",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.consumer.value-deserializer",
      "type": "java.lang.Class<?>",
      "description": "Deserializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
    },
    {
      "name": "spring.kafka.listener.ack-count",
      "type": "java.lang.Integer",
      "description": "Number of records between offset commits when ackMode is \"COUNT\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-mode",
      "type": "org.springframework.kafka.listener.AbstractMessageListenerContainer$AckMode",
      "description": "Listener AckMode; see the spring-kafka documentation.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.ack-time",
      "type": "java.lang.Long",
      "description": "Time in milliseconds between offset commits when ackMode is \"TIME\" or\n \"COUNT_TIME\".",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.concurrency",
      "type": "java.lang.Integer",
      "description": "Number of threads to run in the listener containers.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    },
    {
      "name": "spring.kafka.listener.poll-timeout",
      "type": "java.lang.Long",
      "description": "Timeout in milliseconds to use when polling the consumer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
    }

producer配置

{
      "name": "spring.kafka.producer.acks",
      "type": "java.lang.String",
      "description": "Number of acknowledgments the producer requires the leader to have received\n before considering a request complete.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.batch-size",
      "type": "java.lang.Integer",
      "description": "Number of records to batch before sending.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.bootstrap-servers",
      "type": "java.util.List<java.lang.String>",
      "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.buffer-memory",
      "type": "java.lang.Long",
      "description": "Total bytes of memory the producer can use to buffer records waiting to be sent\n to the server.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.client-id",
      "type": "java.lang.String",
      "description": "Id to pass to the server when making requests; used for server-side logging.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.compression-type",
      "type": "java.lang.String",
      "description": "Compression type for all data generated by the producer.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.key-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for keys.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.retries",
      "type": "java.lang.Integer",
      "description": "When greater than zero, enables retrying of failed sends.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.producer.value-serializer",
      "type": "java.lang.Class<?>",
      "description": "Serializer class for values.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
    },
    {
      "name": "spring.kafka.properties",
      "type": "java.util.Map<java.lang.String,java.lang.String>",
      "description": "Additional properties used to configure the client.",
      "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容