背景
kafka作为主流的消息队列实现,能够实现系统间的解耦,完成上下游系统间的数据同步,在对客服务中有很多的应用场景。本文在kafka相关概念的基础上,通过实例来介绍kafka的使用。
本地部署
服务介绍
本地部署采用docker-compose的形式,能够方便地进行配置和管理,集合了zookeeper, kafka, kafka-ui,各自对应的功能介绍如下:
Zookeeper
用来管理kafka元数据,以及完成生产者,消费者的协调,提供高可用。Kafka
也被称为kafka broker , 是具体的kafka实例,提供消息的存储和生产、消费服务。Kafka-ui
一个网页端的用户界面,可以用来管理消息主题,查看消息等操作。
部署配置
实际的docker部署配置如下
version: "3"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
# network_mode: "bridge"
container_name: zookeeper
ports:
- "2181:2181"
volumes:
- /d/docker-compose/kafka/zookeeper:/bitnami/zookeeper #持久化数据
environment:
- TZ=Asia/Shanghai
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka
kafka:
restart: always
image: docker.io/bitnami/kafka:3.4
# network_mode: "bridge"
container_name: kafka
ports:
- "9004:9004"
volumes:
- /d/docker-compose/kafka/kafka:/bitnami/kafka #持久化数据
environment:
- TZ=Asia/Shanghai
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9004
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9004 #替换成你自己的IP
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
networks:
- kafka
kafka-ui:
restart: always
image: provectuslabs/kafka-ui:latest
# network_mode: "bridge"
container_name: kafka-ui
ports:
- 9001:8080
volumes:
- /d/docker-compose/kafka/ui-kafka/etc/localtime:/etc/localtime
environment:
# 集群名称
- KAFKA_CLUSTERS_0_NAME=local
# 集群地址
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9004 #替换成你自己的IP
networks:
- kafka
networks:
kafka:
driver: bridge
启动命令为docker-compose up -d
启动后如下图所示:
访问127.0.0.1:9001即可访问kafka-ui,实际的界面如下所示:
实例和常用配置
如上即启动了对应的kafka服务,可以通过代码实现生产者和消费者的交互过程。
生产和消费示例
1. 生产者
package com.demo.test;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyProducer {
private static final String TOPIC_NAME = "com.demo.kafka.test.v1";
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
long events = 30;
Random rnd = new Random();
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9004");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("message.timeout.ms", "3000");
log.info("start ...");
Producer<String, String> producer = new KafkaProducer<>(props);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC_NAME, ip, msg);
producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
}).get(10, TimeUnit.SECONDS);
}
System.out.println("send message done");
producer.close();
System.exit(-1);
}
}
2. 消费者
package com.demo.test;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
public class MyConsumer {
private static final String TOPIC_NAME = "com.demo.kafka.test.v1";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9004");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put("auto.offset.reset", "latest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
注意:由于是在容器内,对应的kafka名称设置成了kafka,实际执行中是验证了host的,所以需要在本地的hosts中添加一条
127.0.0.1 kafka
3. 相关依赖和版本
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
生产者日志
部分日志
14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.5.0
14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c97b88d5db4de28d
14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1714978156006
14:49:16.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.ClientUtils - Resolved host 127.0.0.1 as 127.0.0.1
14:49:16.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9004 (id: -1 rack: null) using address /127.0.0.1
14:49:16.010 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
14:49:16.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
14:49:16.183 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
14:49:16.183 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
14:49:16.200 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.5.0')
14:49:16.226 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=3), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=2), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
14:49:16.267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 3 [usable: 3], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], AlterPartition(56): 0 to 2 [usable: 2], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
14:49:16.268 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='com.demo.kafka.test.v1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 127.0.0.1:9004 (id: -1 rack: null)
14:49:16.269 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='com.demo.kafka.test.v1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false14:49:16.314 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=3, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=44, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 44
14:49:16.332 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.335 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=45, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 45
14:49:16.347 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=5, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.351 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=5, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=46, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 46
14:49:16.363 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=124]}
14:49:16.366 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=47, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 47
14:49:16.379 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.382 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=48, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 48
send message done
14:49:16.383 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
14:49:16.383 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
14:49:16.387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
14:49:16.388 [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
14:49:16.388 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed
消费者日志
部分日志
14:52:05.505 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-kafeidou_group-1, correlationId=27, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=10000, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=722495926, sessionEpoch=4, topics=[], forgottenTopicsData=[], rackId='')
14:52:10.489 [kafka-coordinator-heartbeat-thread | kafeidou_group] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending Heartbeat request with generation 11 and member id consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93 to coordinator kafka:9004 (id: 2147483646 rack: null)
14:52:10.489 [kafka-coordinator-heartbeat-thread | kafeidou_group] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafeidou_group-1, correlationId=28, headerVersion=2) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='kafeidou_group', generationId=11, memberId='consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93', groupInstanceId=null)
14:52:10.489 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending asynchronous auto-commit of offsets {com.demo.kafka.test.v1-0=OffsetAndMetadata{offset=49, leaderEpoch=null, metadata=''}}
14:52:10.490 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-kafeidou_group-1, correlationId=29, headerVersion=2) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='kafeidou_group', generationId=11, memberId='consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='com.demo.kafka.test.v1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=49, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
14:52:10.491 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafeidou_group-1, correlationId=28, headerVersion=2): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
14:52:10.491 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received successful Heartbeat response
14:52:10.493 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-kafeidou_group-1, correlationId=29, headerVersion=2): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='com.demo.kafka.test.v1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
14:52:10.494 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Committed offset 49 for partition com.demo.kafka.test.v1-0
14:52:10.494 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Completed asynchronous auto-commit of offsets {com.demo.kafka.test.v1-0=OffsetAndMetadata{offset=49, leaderEpoch=null, metadata=''}}
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Removing pending request for node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Added READ_UNCOMMITTED fetch request for partition com.demo.kafka.test.v1-0 at position FetchPosition{offset=54, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:9004 (id: 1 rack: null)], epoch=0}} to node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Built incremental fetch (sessionId=722495926, epoch=22) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(com.demo.kafka.test.v1-0), toForget=(), toReplace=(), implied=(), canUseTopicIds=True) to broker kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Adding pending request for node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-kafeidou_group-1, correlationId=97, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=10000, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=722495926, sessionEpoch=22, topics=[FetchTopic(topic='com.demo.kafka.test.v1', topicId=gtXOJl3PQZyfmZsuZxQdWQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=54, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
offset = 53, key = 192.168.2.41, value = 1714978460356,www.example.com,192.168.2.41
常见概念和配置
- FETCH request
FETCH 表示从消费者拉取消息的请求,这里是定时拉取,拉取的间隔可以通过如下的配置进行修改。
fetch.max.wait.ms 间隔的毫秒数
- OFFSET_COMMIT request
该请求是消费者告知当前消费的消息偏移量,间隔可以通过如下的配置修改。
auto.commit.interval.ms
- HEARTBEAT request
该请求保证消费者和消费组的协调,在超时后则表示消费者连接过期,探活间隔可以使用如下的配置修改。
heartbeat.interval.ms
- Auto commit
是否在消费端自动提交offset, 设置为开启后不用手动去确认消费,如果关闭自动提交,且没有手动确认消费,则会导致消息的consumer lag 增加。 可以在如下的配置项中修改。
enable.auto.commit false/true