springboot整合kafka实现消息推送

前言

本篇文章主要介绍的是springboot整合kafka。

安装kafka

1.使用docker安装kafka,移步 https://www.jianshu.com/p/89b19f5b28ec

创建工程

1.创建一个名为springboot-kafka的pom项目作为父工程,将main和resource文件夹都删除,pom文件添加配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <name>springboot-kafka</name>
    <modules>
        <module>springboot-kafka-common</module>
        <module>springboot-kafka-consumer</module>
        <module>springboot-kafka-producer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.67</version>
        </dependency>
    </dependencies>

</project>
3.创建公共服务模块

创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等
1.配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.lxg</groupId>
        <artifactId>springboot-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>springboot-kafka-common</artifactId>
</project>

pom文件中以父工程作为父依赖,就不需要额外引入依赖了
2.新建一个user实体类

@Data
public class User implements Serializable {
    /**
     * id
     */
    private Integer id;

    /**
     * 用户名字
     */
    private String username;

    /**
     * 密码
     */
    private String password;
}

3.创建application-common.yml配置文件,主要添加kafka的公共配置

spring:
  kafka:
    #kafka配置
    bootstrap-servers: 192.168.56.102:9092
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 5000
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建
    topic:
      userTopic: userInfo
4.创建消息生产者,即创建一个名为springboot-kafka-producer的普通springboot项目

1.pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml

server:
  port: 8081
spring:
  application:
    name: kafka-producer
  profiles:
    active: common

3.controller层
创建UserController

@Slf4j
@Controller
@RequestMapping("/api/user")
public class UserController {
    @Autowired
    private UserService userService;

    @ResponseBody
    @GetMapping("/getUser")
    public void getUser() {
        userService.sendUserMsg();
        log.info("getUser");
    }
}

4.service层
创建UserService

public interface UserService {
    /**
     * 发送用户信息
     *
     * @return
     */
    Boolean sendUserMsg();
}

创建UserServiceImpl

@Slf4j
@Service
public class UserServiceImpl implements UserService {
    @Value("${spring.kafka.topic.userTopic}")
    private String userTopic;

    @Autowired
    KafkaTemplate kafkaTemplate;


    @Override
    public Boolean sendUserMsg() {
        User user = new User();
        user.setId(1);
        user.setUsername("lxg");
        user.setPassword("6767167");
        kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));
        log.info("lxg");
        return Boolean.TRUE;
    }
}

5.创建启动类

@SpringBootApplication
public class WebApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }
}
5.创建消息消费者

1.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

2.创建yml配置文件

server:
  port: 8082
spring:
  application:
    name: kafka-consumer
  profiles:
    active: common

3.创建consumer消费者类

@Slf4j
@Component
public class UserConsumer {

    @KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})
    public void userConsumer(String message) {
        log.info("receive msg " + message);
    }
}

4.启动类

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

测试

启动producer和consumer两个服务模块
访问producer微服务中的接口 http://localhost:8081/api/user/getUser
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体

本文GitHub源码:https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka

CSDN:https://blog.csdn.net/qq_27682773
简书:https://www.jianshu.com/u/e99381e6886e
博客园:https://www.cnblogs.com/lixianguo
个人博客:https://www.lxgblog.com`

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343