序
本文主要列一下spring for apache kafka的一些auto config以及属性配置
maven
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.3.RELEASE</version>
</dependency>
这个版本使用的是kafka client 0.10.2.1版本
使用的spring retry是1.1.3.RELEASE版本
几个关键配置类
- KafkaAutoConfiguration
spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<Object, Object>();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<Object, Object>(
this.properties.buildProducerProperties());
}
}
- KafkaAnnotationDrivenConfiguration
spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
return configurer;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
}
}
配置属性
spring-boot-autoconfigure-1.5.7.RELEASE.jar!/META-INF/spring-configuration-metadata.json
公共配置
{
"name": "spring.kafka.bootstrap-servers",
"type": "java.util.List<java.lang.String>",
"description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
},
{
"name": "spring.kafka.client-id",
"type": "java.lang.String",
"description": "Id to pass to the server when making requests; used for server-side logging.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
},
{
"name": "spring.kafka.ssl.key-password",
"type": "java.lang.String",
"description": "Password of the private key in the key store file.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
},
{
"name": "spring.kafka.ssl.keystore-location",
"type": "org.springframework.core.io.Resource",
"description": "Location of the key store file.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
},
{
"name": "spring.kafka.ssl.keystore-password",
"type": "java.lang.String",
"description": "Store password for the key store file.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
},
{
"name": "spring.kafka.ssl.truststore-location",
"type": "org.springframework.core.io.Resource",
"description": "Location of the trust store file.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
},
{
"name": "spring.kafka.ssl.truststore-password",
"type": "java.lang.String",
"description": "Store password for the trust store file.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl"
},
{
"name": "spring.kafka.template.default-topic",
"type": "java.lang.String",
"description": "Default topic to which messages will be sent.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Template"
}
consumer配置属性
{
"name": "spring.kafka.consumer.auto-commit-interval",
"type": "java.lang.Integer",
"description": "Frequency in milliseconds that the consumer offsets are auto-committed to Kafka\n if 'enable.auto.commit' true.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.auto-offset-reset",
"type": "java.lang.String",
"description": "What to do when there is no initial offset in Kafka or if the current offset\n does not exist any more on the server.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.bootstrap-servers",
"type": "java.util.List<java.lang.String>",
"description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.client-id",
"type": "java.lang.String",
"description": "Id to pass to the server when making requests; used for server-side logging.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.enable-auto-commit",
"type": "java.lang.Boolean",
"description": "If true the consumer's offset will be periodically committed in the background.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.fetch-max-wait",
"type": "java.lang.Integer",
"description": "Maximum amount of time in milliseconds the server will block before answering\n the fetch request if there isn't sufficient data to immediately satisfy the\n requirement given by \"fetch.min.bytes\".",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.fetch-min-size",
"type": "java.lang.Integer",
"description": "Minimum amount of data the server should return for a fetch request in bytes.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.group-id",
"type": "java.lang.String",
"description": "Unique string that identifies the consumer group this consumer belongs to.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.heartbeat-interval",
"type": "java.lang.Integer",
"description": "Expected time in milliseconds between heartbeats to the consumer coordinator.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.key-deserializer",
"type": "java.lang.Class<?>",
"description": "Deserializer class for keys.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.max-poll-records",
"type": "java.lang.Integer",
"description": "Maximum number of records returned in a single call to poll().",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.consumer.value-deserializer",
"type": "java.lang.Class<?>",
"description": "Deserializer class for values.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer"
},
{
"name": "spring.kafka.listener.ack-count",
"type": "java.lang.Integer",
"description": "Number of records between offset commits when ackMode is \"COUNT\" or\n \"COUNT_TIME\".",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
},
{
"name": "spring.kafka.listener.ack-mode",
"type": "org.springframework.kafka.listener.AbstractMessageListenerContainer$AckMode",
"description": "Listener AckMode; see the spring-kafka documentation.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
},
{
"name": "spring.kafka.listener.ack-time",
"type": "java.lang.Long",
"description": "Time in milliseconds between offset commits when ackMode is \"TIME\" or\n \"COUNT_TIME\".",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
},
{
"name": "spring.kafka.listener.concurrency",
"type": "java.lang.Integer",
"description": "Number of threads to run in the listener containers.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
},
{
"name": "spring.kafka.listener.poll-timeout",
"type": "java.lang.Long",
"description": "Timeout in milliseconds to use when polling the consumer.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener"
}
producer配置
{
"name": "spring.kafka.producer.acks",
"type": "java.lang.String",
"description": "Number of acknowledgments the producer requires the leader to have received\n before considering a request complete.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.batch-size",
"type": "java.lang.Integer",
"description": "Number of records to batch before sending.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.bootstrap-servers",
"type": "java.util.List<java.lang.String>",
"description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.buffer-memory",
"type": "java.lang.Long",
"description": "Total bytes of memory the producer can use to buffer records waiting to be sent\n to the server.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.client-id",
"type": "java.lang.String",
"description": "Id to pass to the server when making requests; used for server-side logging.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.compression-type",
"type": "java.lang.String",
"description": "Compression type for all data generated by the producer.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.key-serializer",
"type": "java.lang.Class<?>",
"description": "Serializer class for keys.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.retries",
"type": "java.lang.Integer",
"description": "When greater than zero, enables retrying of failed sends.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.producer.value-serializer",
"type": "java.lang.Class<?>",
"description": "Serializer class for values.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer"
},
{
"name": "spring.kafka.properties",
"type": "java.util.Map<java.lang.String,java.lang.String>",
"description": "Additional properties used to configure the client.",
"sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties"
}