基础配置
1.mvn 坐标
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- json 工具 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
2.kafka 配置文件
spring:
### kafka 配置
kafka:
producer:
# 重试次数
retries: 0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
# 批量大小
batch-size: 16384
# 生产端缓冲区大小
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 是否自动提交offset
enable-auto-commit: true
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
group-id: test-consumer
bootstrap-servers: 10.162.108.62:9092
1.简单生产模型
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
// 生产消息
kafkaTemplate.send(topic1, msg);
生产消息
2.回调消息生产模型
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
// 生产消息
kafkaTemplate.send(top, msg).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
log.info("发送消息失败:" + failure.getMessage());
});
3.事物消息生产模型
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
// 生产消息
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic1, msg);
throw new RuntimeException("fail");
});
消费消息
1.简单消费模型
@KafkaListener(topics = {"topic1"},groupId = "1",errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<String, String> record) {
log.info("简单消费 {}:" + record.topic() + "-" + record.partition() + "-" + record.value());
}
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
log.info("消费异常:"+message.getPayload());
return null;
};
}
topics:消费top
groupId:消费者分组groupId (id 一致时表示点对点通信,不一致时表示广播消息)
errorHandler:异常处理器