随着spring boot流行,越来越多的开发者转向spring boot作为java项目的底层框架,而spring boot集成中间件的配置不同于spring的xml方式,现给大家做一个spring boot集成多个kafka集群生产发送消息示例,希望对大家能有帮助
依赖环境
spring boot:2.0.4
kafka:1.1.0
示例代码
1:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wl</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-kafka</name>
<description>Demo project for Spring Boot Integrate kafka</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--kafka依赖 start-->
<!--
此处用spring-kafka,依赖的kafka-clients为1.0.2,由于服务端是1.1.0版本,
所以此处去掉spring-kafka本身依赖的kafka客户端,引入了1.1.0版本的kafka-clients,
如果不需要spring-kafka,可以直接应用原生kafka-clients
-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<!--kafka依赖 end-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2:application.properties
#============== kafka cluster1===================
kafka.cluster1.consumer.zookeeper.connect=127.0.0.1:2181
kafka.cluster1.consumer.servers=127.0.0.1:9092
kafka.cluster1.producer.servers=127.0.0.1:9092
#============== kafka cluster2===================
kafka.cluster2.consumer.zookeeper.connect=127.0.0.1:2182
kafka.cluster2.consumer.servers=127.0.0.1:9093
kafka.cluster2.producer.servers=127.0.0.1:9093
#============== kafka 公共配置===================
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
3:SpringKafkaProducerConfig.java
该类配置了两个KafkaTemplate,分别对应kafka cluster1与kafka cluster2
package com.wl.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* @Author:
* @Description:kafka生产者配置,配置多个生产模版向多个kafka集群发送消息示例
* @Date: Created in 下午4:48 2018/8/9
* @Modified By:
*/
@Configuration
@EnableKafka
public class SpringKafkaProducerConfig {
/**
* kafka集群1
*/
@Value("${kafka.cluster1.producer.servers}")
private String cluster1Servers;
/**
* kafka集群2
*/
@Value("${kafka.cluster2.producer.servers}")
private String cluster2Servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
/**
* kafka cluster1 生产模版
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaCluster1Template() {
Map<String, Object> configProps = producerConfigProps();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster1Servers);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
}
/**
* kafka cluster2 生产模版
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaCluster2Template() {
Map<String, Object> configProps = producerConfigProps();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster2Servers);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
}
/**
* 公共配置
* @return
*/
private Map<String, Object> producerConfigProps(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, linger);
configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
return configProps;
}
}
4:SpringKafkaConsumerConfig.java
该类配置了两个ConcurrentKafkaListenerContainerFactory,分别对应kafka cluster1与kafka cluster2
package com.wl.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Author:
* @Description: kafka消费者配置,配置多个消费工厂消费不同的kafka集群消息示例
* @Date: Created in 下午3:20 2018/8/8
* @Modified By:
*/
@Configuration
@EnableKafka
public class SpringKafkaConsumerConfig {
/**
* kafka cluster1配置
*/
@Value("${kafka.cluster1.consumer.servers}")
private String kafkaCluster1BootstrapServers;
/**
* kafka cluster2配置
*/
@Value("${kafka.cluster2.consumer.servers}")
private String kafkaCluster2BootstrapServers;
/**
* 公共配置
*/
@Value("${kafka.consumer.session.timeout}")
private Integer sessionTimeoutMs;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.auto.commit.interval}")
private Integer autoCommitIntervalMs;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.group.id}")
private String groupId;
/**
* kafka cluster1消费工厂
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> cluster1KafkaListenerContainerFactory() {
Map<String, Object> configProps = configProps();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster1BootstrapServers);
return getKafkaListenerContainerFactory(configProps);
}
/**
* kafka cluster2消费工厂
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> cluster2KafkaListenerContainerFactory() {
Map<String, Object> configProps = configProps();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster2BootstrapServers);
return getKafkaListenerContainerFactory(configProps);
}
/**
* 创建ConcurrentKafkaListenerContainerFactory
* @param configProps
* @return
*/
private ConcurrentKafkaListenerContainerFactory<String, String> getKafkaListenerContainerFactory(Map<String, Object> configProps){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> basicConsumerFactory = new DefaultKafkaConsumerFactory<>(configProps);
factory.setConsumerFactory(basicConsumerFactory);
//设定为批量消费
factory.setBatchListener(true);
return factory;
}
/**
* 公共配置
* @return
*/
private Map<String, Object> configProps(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);
return configProps;
}
}
4:MyProducer.java
该类为生产者测试类,定时向kafka cluster1与kafka cluster2发送消息
package com.wl.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @Author:
* @Description: 定时向kafka集群1和kafka集群2发送消息
* @Date: Created in 下午9:33 2018/8/9
* @Modified By:
*/
@Component
@EnableScheduling
public class MyProducer {
@Autowired
@Qualifier("kafkaCluster1Template")
private KafkaTemplate kafkaCluster1Template;
@Autowired
@Qualifier("kafkaCluster2Template")
private KafkaTemplate kafkaCluster2Template;
/**
* 每5秒向Kafka集群1的topick:kafkacluster1test发送消息
*/
@Scheduled(cron = "*/5 * * * * ?")
public void produceMsgToKafkaCluster1(){
System.out.println("向kafka cluster1 发送消息");
kafkaCluster1Template.send("kafkacluster1test", "hello cluster1");
}
/**
* 每10秒向Kafka集群2的topick:kafkacluster2test发送消息
*/
@Scheduled(cron = "*/10 * * * * ?")
public void produceMsgToKafkaCluster2(){
System.out.println("向kafka cluster2 发送消息");
kafkaCluster2Template.send("kafkacluster2test", "hello cluster2");
}
}
4:MyConsumer.java
该类为消费者测试类,包含两个消费任务,分别从kafka cluster1和kafka cluster2消费消息
package com.wl.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author:
* @Description:消费kafka集群1的kafkacluster1test主题消息和kafka集群2的kafkacluster2test主题消息
* @Date: Created in 下午9:46 2018/8/9
* @Modified By:
*/
@Component
public class MyConsumer {
/**
* 消费kafka集群1的kafkacluster1test主题消息
* @param records
*/
@KafkaListener(topics = "kafkacluster1test", containerFactory = "cluster1KafkaListenerContainerFactory")
private void kafkacluster1testConsumer(List<ConsumerRecord<String, String>> records) {
System.out.println("消费kafkacluster1test消息:" + records.size() + ">>>" + records.toString());
}
/**
* 消费kafka集群2的kafkacluster2test主题消息
* @param records
*/
@KafkaListener(topics = "kafkacluster2test", containerFactory = "cluster2KafkaListenerContainerFactory")
private void kafkacluster2testConsumer(List<ConsumerRecord<String, String>> records) {
System.out.println("消费kafkacluster2test消息:" + records.size() + ">>>" + records.toString());
}
}
4:运行测试
启动springboot启动类,观察日志