1.生产者
1.1 架构
当需要往broker
发送消息时,则需要创建一个或者多个生产者往broker
发布消息,虽然借助SpringBoot
往broker
里面发送消息的API比较简单
如果借助
SprinBoot
发送消息,后面章节会阐述到
但是很多时候不同的业务场景会出现不同的问题,例如:
- 允不允许消息重复
- 允不允许消息延迟
- 允不允许消息的丢失
不同的场景下,光是知道API的使用肯定是满足不了的,因此在使用API
之前还需了解发送消息的原理。
发送消息原理图如下:
关于上述原理图解释如下:
生产者源源不断生产消息,一般消息为
key-value
形式,当然也可以不指定key
消息需要不能直接发送给
broker
,而是需要经过序列化成为一段字节序列才可以传输-
序列完成需要经过分区器,分区器会根据分区分配策略去决定这个消息发往哪个分区
分区策略分为两种情况,消息有key值和消息没有key值
- 消息有key值时,会根据key值得
hash
值然后对分区数进行取模决定消息发送给哪个partition
- 消息没有key值,会随机发送给某个分区(不同的版本,策略不一样,有的时轮询,有的随机,有的则是一段时间内只发送给某个分区,隔了一段时间发送给另外一个分区)
- 消息有key值时,会根据key值得
-
分区数确定以后真正发送具体的
broker
上,broker
的leader
会把消息写入文件中写入成功则发送元数据给生产者,如果失败则根据配置是否重试机制进行重试
1.2 topic
当消息发送到topic
时,其实消息是发送到topic
的partition
上,而在物理上一个partition
就是对应的就是一个目录
例如:在kafka-eagle
上创建wangzh
的topic
,且分区数为3,副本数为 1,如下:
查看该topic
详情可知,三个分区其中131
上的partition-0
为leader
,其他的如下:
同时取查看131
机器上的数据/var/data/kafka
(这个目录是当时安装时指定的数据存储目录)
所以一个分区在物理上对应的就是一个目录
1.3 存储
当发送消息时到topic
的partitions
上,分区会消息写入segment
文件上,一个partitions
由多个segment
文件组成,如下:
每个segment
文件默认存储数据大小为1G
,当然也可以通过修改kafka
参数调整
# 单个segment存储数据大小
log.segment.bytes=具体内容
# 当超过一定的时间(默认七天),写入segment文件的数据还没有达到1G(默认大小)
# 也会重新创建新的segment文件
log.segment.ms=时间
从上图中看出,第一个segment
文件的偏移量一定是从0开始的,而下一个segment
文件则是从上个segment
文件偏移量开始的
同时segment
文件分为.index
和.log
文件,如下:
其中.log
用来存储真正的数据,.index
是索引文件
假如如果想要消费偏移量为197的文件,如果没有索引则需要从头到位去寻找,而有了索引文件就完全可以提高查询速度
其中前面一大串代表文件名,第一个segment
文件肯定是从0开始,第二个segment
文件命名则是以上个文件偏移量+1命名,如下:
第一个segment文件命名
0000000000000000.index
0000000000000000.log
当上一个文件偏移量为
1679898
是,那么下个segment
文件命名为
00000000001679899.index
00000000001679899.log
以此类推
1.4 发送
经过上面的消息,已经知道生产者发送原理,接下来就借助SpringBoot
往broker
发送消息。如下:
1.4.1 创建
先创建springboot
项目kafka-springboot-test
,并且导入kafka
依赖,其pom.xml
内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
在application.yml
增加以下配置
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果没有至少一个配置的主题,则容器是否应无法启动
# false 代表关闭此功能
missing-topics-fatal: false
producer:
# 发布消息时,key的序列化器,这里是kafka提供的序列化器
# 当发送消息的key值不是字符串时,需要自己写自定义序列化器
# 生产者通过该序列化器将消息的key值序列化为字节数组
# 后面会讲述如何自定义序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 发布消息时,value的序列化器,这里是kafka提供的序列化器
# 当发送消息的key不是字符串时,需要自己写自定义序列化器
# 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
# 生产者通过该序列化器将消息的key值序列化为字节数组
value-serializer: org.apache.kafka.common.serialization.StringSerializer
1.4.2 发布
当配置完成即可发布消息,发消息先创建topic
,上文中已经创建了test_topic
这里就不再创建了
发布消息则是借助org.springframework.kafka.core.KafkaTemplate
发布消息,直接注入即可,代码如下:
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
@Slf4j
class DemoApplicationTests {
/**
* 第一个泛型为 key值的数据类型
* 第二个泛型为 value值的数据类型
*/
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
void contextLoads() throws Exception {
ListenableFuture<SendResult<String, String>> resultListenableFuture =
kafkaTemplate.send("wangzh", "test-key", "test-topic");
log.info("元数据信息:" + resultListenableFuture.get());
log.info("发送消息完毕");
// 关闭连接
kafkaTemplate.destroy();
}
}
执行代码成功后,就在kafka-eagle
看到消息发送结果,如下
当然也可以指定
partition
发送消息
1.4.3 acks
acks 参数 规则定了必须须要有多少分区副本收到消息,生产者才会认为消息写入是成功的 这个参数对消息丢失 可能性有重要影响,目前该参数配置如下:
acks = 0
生产者在成功写入消息之前不会等待任何来自服务器的响应
意味着生产者不知道消息有没有把消息发送到
broker
,只要生产者将消息添加到Socket
缓冲区就认为消息发送成功,不需要等待服务器 的响应。因此这种方式也可以支持很高的吞吐量
acks=1
只要集群的
leader
节点收到消息并写入到segment
文件,生产者就会收到来自服务器的成功响应,视为发送成功假如
leader
数据写入成功,然后宕机,此时所有的副本还没来的及同步数据,那么刚写入的数据就会丢失
acks=all
集群的
leader
收到消息并写入到segment
中,同时等待所有的副本同步消息成功后才认为消息发送成功这种模式是最安全的,及时有的
leader
发生奔溃,那还是可以重新选举leader
进行通信
在配置文件的producer
里面设置acks
即可
2.消费者
2.1 架构
消费者如果订阅了某个主题消息,那么就可以去进行消费,同时一个消费者属于一个消费组,一个消费组里面所有的消费者都订阅同一个主题.如下:
当消费组里面只有一个消费者时,那么这个消费就回去消费所有分区的消息,当然一般开发也就足够了。
但有时候生产者生产消息过快,而消费者消费消息过慢,就会很容易导致消息堆积,从而阻塞,那么就可以在消费者组里面多增加几个消费者,如下:
注意:同一组的消费者是不会消费同一主题的同一分区消息
当然如果消费者的数量超过了分区数,那么超过的消费者就会处于空闲状态
因此不要让消费者的数量超过分区数
一个消息只能被一个组消费一次,例如上图中consumer-1
消费了消息A
,那么其他的消费者就不能够再次消费A
了
如果在消费时,手动指定了偏移量,那么就会重复消费消息,这种情况特殊
当然同一个消息可以被多个消费组进行消费,如下图所示:
2.2 分配
如下图,消费组中可以增加,当增加一个消费者,就会分摊之前消费者的消费压力,那么当新增一个消费者是如何将分区分配给消费者的呢
当消费者新增一个消费者时,会提高消费者的高可用和伸缩性,且当加入到消费组之后就会
给新增的消费者分配一个partition
,这种操作称为再分配
注意:在再分配期间,消费者会暂停消费消息,直到分配分区完成才会继续消费消息
且当分区分配给再次分配给某个消费者时,消费者的消息可能丢失读取状态
同理当consumer-2
消费者退出消费者组时,那么partition-2
就会分配到consumer-1
,让他去进行消费
那么kafka
是如何知道消费组里面需要再分配呢?这主要是借助于组协调器
,每个消费组都会由属于自己的组协调器。
每隔消费者都会发送心跳到协调器,用来维护群组关系和分区关系,如下图所示:
这样kafka
就知道了每个消费者属于哪个消费组,以及如何去分配partition
协调器就类似于
spring cloud
里面的注册中心
当消费者因为某些因素突然停止消费,也就是说协调器收不到消费者的心跳,那么协调器会等待几秒,几秒期间还是没有收到心跳,那么协调器就会把该消费者剔除出组,然后实现再分配。
2.3 消费
这里同样借助SpringBoot
去消费消息,消费者配置如下:
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果没有至少一个配置的主题,则容器是否应无法启动
# false 代表关闭此功能
missing-topics-fatal: false
producer:
# 发布消息时,key的序列化器,这里是kafka提供的序列化器
# 当发送消息的key值不是字符串时,需要自己写自定义序列化器
# 生产者通过该序列化器将消息的key值序列化为字节数组
# 后面会讲述如何自定义序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 发布消息时,value的序列化器,这里是kafka提供的序列化器
# 当发送消息的key不是字符串时,需要自己写自定义序列化器
# 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
# 生产者通过该序列化器将消息的key值序列化为字节数组
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 消费者组id
group-id: wangzh-group
# 是否允许自动提交offset
# 每当消费者消费一个消息就会产生一个偏移量
# 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
enable-auto-commit: true
# 提交偏移量间隔时间数 100ms提交一次
auto-commit-interval: 100
# 消费消息时的反序列器
# 消费消息时会将字节序列反序列化为字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费消息时的反序列化器
# 消费消息时会将字节序反序列化为字符串
# 如果消息不是字符串时,需要自己写反序列话器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 偏移量配置
# latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
# none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
# 量那么抛出异常
# earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
# 从头开始消费
auto-offset-reset: latest
消费消,利用org.springframework.kafka.annotation.KafkaListener
注解即可消费消息,如下:
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Consumer {
/**
* topics 为 topic名字,可以填写多个topic的名字
* ConsumerRecord 为 消息记录,包含了一条消息大部分数据
*/
@KafkaListener(topics = {"wangzh"})
public void consumer(ConsumerRecord<String,String> record) {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
}
}
启动项目即可看到消费的消息,如下:
2.4 批量
上次消费消息时一条一条消费,也就是当一条消息消费完成,才会去消费下一条,这肯定不大合理,因此在数据量大的情况下需要去进行批量消费
批量消费设置如下:
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果没有至少一个配置的主题,则容器是否应无法启动
# false 代表关闭此功能
missing-topics-fatal: false
producer:
# 发布消息时,key的序列化器,这里是kafka提供的序列化器
# 当发送消息的key值不是字符串时,需要自己写自定义序列化器
# 生产者通过该序列化器将消息的key值序列化为字节数组
# 后面会讲述如何自定义序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 发布消息时,value的序列化器,这里是kafka提供的序列化器
# 当发送消息的key不是字符串时,需要自己写自定义序列化器
# 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
# 生产者通过该序列化器将消息的key值序列化为字节数组
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置为批量消费,默认为单条消费
type: batch
consumer:
# 消费者组id
group-id: wangzh-group
# 是否允许自动提交offset
# 每当消费者消费一个消息就会产生一个偏移量
# 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
enable-auto-commit: true
# 提交偏移量间隔时间数 100ms提交一次
auto-commit-interval: 100
# 消费消息时的反序列器
# 消费消息时会将字节序列反序列化为字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费消息时的反序列化器
# 消费消息时会将字节序反序列化为字符串
# 如果消息不是字符串时,需要自己写反序列话器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 偏移量配置
# latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
# none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
# 量那么抛出异常
# earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
# 从头开始消费
auto-offset-reset: latest
# 批量消费时,最多一次消费多少条数据
max-poll-records: 1000
同时还需要修改接受消息的参数,修改如下:
@KafkaListener(topics = {"wangzh"})
public void consumer(List<ConsumerRecord<String,String>> records) {
records.forEach(record -> {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
});
}
2.5 指定
通过之前的学习知道,消费者每消费一条消息就会提交一次偏移量,下次消费时从偏移量后面开始消费,这样保证消息不会重复消费。
有时候有一种特殊情况,需要指定偏移量去进行消费,那么之前普通消费并不能满足,因此需要自定义操作
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class Consumer {
/**
* 每次消费 {"0","1","2"} 消息偏移量从1开始消费
* @param records
*/
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "wangzh",partitions = {"0","1","2"},partitionOffsets = @PartitionOffset(initialOffset = "1",partition = "*"))
})
public void consumer(List<ConsumerRecord<String,String>> records) {
records.forEach(record -> {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
});
}
}