spring boot集成kafka

随着spring boot流行,越来越多的开发者转向spring boot作为java项目的底层框架,而spring boot集成中间件的配置不同于spring的xml方式,现给大家做一个spring boot集成多个kafka集群生产发送消息示例,希望对大家能有帮助

依赖环境

spring boot:2.0.4
kafka:1.1.0

示例代码

1:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wl</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot Integrate kafka</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--kafka依赖 start-->
        <!--
            此处用spring-kafka,依赖的kafka-clients为1.0.2,由于服务端是1.1.0版本,
            所以此处去掉spring-kafka本身依赖的kafka客户端,引入了1.1.0版本的kafka-clients,
            如果不需要spring-kafka,可以直接应用原生kafka-clients
         -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--kafka依赖 end-->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
2:application.properties
#============== kafka cluster1===================
kafka.cluster1.consumer.zookeeper.connect=127.0.0.1:2181
kafka.cluster1.consumer.servers=127.0.0.1:9092

kafka.cluster1.producer.servers=127.0.0.1:9092
#============== kafka cluster2===================
kafka.cluster2.consumer.zookeeper.connect=127.0.0.1:2182
kafka.cluster2.consumer.servers=127.0.0.1:9093

kafka.cluster2.producer.servers=127.0.0.1:9093
#============== kafka 公共配置===================
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
3:SpringKafkaProducerConfig.java

该类配置了两个KafkaTemplate,分别对应kafka cluster1与kafka cluster2

package com.wl.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:
 * @Description:kafka生产者配置,配置多个生产模版向多个kafka集群发送消息示例
 * @Date: Created in 下午4:48 2018/8/9
 * @Modified By:
 */
@Configuration
@EnableKafka
public class SpringKafkaProducerConfig {
    /**
     * kafka集群1
     */
    @Value("${kafka.cluster1.producer.servers}")
    private String cluster1Servers;

    /**
     * kafka集群2
     */
    @Value("${kafka.cluster2.producer.servers}")
    private String cluster2Servers;

    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    /**
     * kafka cluster1 生产模版
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaCluster1Template() {
        Map<String, Object> configProps = producerConfigProps();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster1Servers);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
    }

    /**
     * kafka cluster2 生产模版
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaCluster2Template() {
        Map<String, Object> configProps = producerConfigProps();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster2Servers);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
    }

    /**
     * 公共配置
     * @return
     */
    private Map<String, Object> producerConfigProps(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        return configProps;
    }
}
4:SpringKafkaConsumerConfig.java

该类配置了两个ConcurrentKafkaListenerContainerFactory,分别对应kafka cluster1与kafka cluster2

package com.wl.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:
 * @Description: kafka消费者配置,配置多个消费工厂消费不同的kafka集群消息示例
 * @Date: Created in 下午3:20 2018/8/8
 * @Modified By:
 */
@Configuration
@EnableKafka
public class SpringKafkaConsumerConfig {

    /**
     * kafka cluster1配置
     */
    @Value("${kafka.cluster1.consumer.servers}")
    private String kafkaCluster1BootstrapServers;

    /**
     * kafka cluster2配置
     */
    @Value("${kafka.cluster2.consumer.servers}")
    private String kafkaCluster2BootstrapServers;

    /**
     * 公共配置
     */
    @Value("${kafka.consumer.session.timeout}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.auto.commit.interval}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.group.id}")
    private String groupId;


    /**
     * kafka cluster1消费工厂
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> cluster1KafkaListenerContainerFactory() {
        Map<String, Object> configProps = configProps();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster1BootstrapServers);
        return getKafkaListenerContainerFactory(configProps);
    }

    /**
     * kafka cluster2消费工厂
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> cluster2KafkaListenerContainerFactory() {
        Map<String, Object> configProps = configProps();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster2BootstrapServers);
        return getKafkaListenerContainerFactory(configProps);
    }

    /**
     * 创建ConcurrentKafkaListenerContainerFactory
     * @param configProps
     * @return
     */
    private ConcurrentKafkaListenerContainerFactory<String, String> getKafkaListenerContainerFactory(Map<String, Object> configProps){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory<String, String> basicConsumerFactory = new DefaultKafkaConsumerFactory<>(configProps);
        factory.setConsumerFactory(basicConsumerFactory);
        //设定为批量消费
        factory.setBatchListener(true);
        return factory;
    }

    /**
     * 公共配置
     * @return
     */
    private Map<String, Object> configProps(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);
        return configProps;
    }

}

4:MyProducer.java

该类为生产者测试类,定时向kafka cluster1与kafka cluster2发送消息

package com.wl.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @Author:
 * @Description: 定时向kafka集群1和kafka集群2发送消息
 * @Date: Created in 下午9:33 2018/8/9
 * @Modified By:
 */
@Component
@EnableScheduling
public class MyProducer {

    @Autowired
    @Qualifier("kafkaCluster1Template")
    private KafkaTemplate kafkaCluster1Template;

    @Autowired
    @Qualifier("kafkaCluster2Template")
    private KafkaTemplate kafkaCluster2Template;

    /**
     * 每5秒向Kafka集群1的topick:kafkacluster1test发送消息
     */
    @Scheduled(cron = "*/5 * * * * ?")
    public void produceMsgToKafkaCluster1(){
        System.out.println("向kafka cluster1 发送消息");
        kafkaCluster1Template.send("kafkacluster1test", "hello cluster1");
    }

    /**
     * 每10秒向Kafka集群2的topick:kafkacluster2test发送消息
     */
    @Scheduled(cron = "*/10 * * * * ?")
    public void produceMsgToKafkaCluster2(){
        System.out.println("向kafka cluster2 发送消息");
        kafkaCluster2Template.send("kafkacluster2test", "hello cluster2");
    }

}

4:MyConsumer.java

该类为消费者测试类,包含两个消费任务,分别从kafka cluster1和kafka cluster2消费消息

package com.wl.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author:
 * @Description:消费kafka集群1的kafkacluster1test主题消息和kafka集群2的kafkacluster2test主题消息
 * @Date: Created in 下午9:46 2018/8/9
 * @Modified By:
 */
@Component
public class MyConsumer {

    /**
     * 消费kafka集群1的kafkacluster1test主题消息
     * @param records
     */
    @KafkaListener(topics = "kafkacluster1test", containerFactory = "cluster1KafkaListenerContainerFactory")
    private void kafkacluster1testConsumer(List<ConsumerRecord<String, String>> records) {
        System.out.println("消费kafkacluster1test消息:" + records.size() + ">>>" + records.toString());
    }

    /**
     * 消费kafka集群2的kafkacluster2test主题消息
     * @param records
     */
    @KafkaListener(topics = "kafkacluster2test", containerFactory = "cluster2KafkaListenerContainerFactory")
    private void kafkacluster2testConsumer(List<ConsumerRecord<String, String>> records) {
        System.out.println("消费kafkacluster2test消息:" + records.size() + ">>>" + records.toString());
    }
}

4:运行测试

启动springboot启动类,观察日志


kafka-spring-boot.png
5:github代码库

github代码库.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,747评论 6 342
  • Spring Boot集成Kafka Spring Boot集成Kafka前提介绍Kafka简介Topics & ...
    流水不腐小夏阅读 4,003评论 3 14
  • 全球市场概括 澳股: 澳洲股市连续4天上涨,ASX200收盘6130.4,涨0.1%,其中科技业和保健品行业表现最...
    雷蒙Raymond阅读 602评论 0 0
  • 2017,好像发生了好多事 工作:7月份从学校辞职,转入一家公司做HR 生活:有了男友,同居在一起 学习:报法语班...
    王华华在路上阅读 207评论 0 0