kafka实例部署和常见概念

背景

kafka作为主流的消息队列实现,能够实现系统间的解耦,完成上下游系统间的数据同步,在对客服务中有很多的应用场景。本文在kafka相关概念的基础上,通过实例来介绍kafka的使用。

本地部署

服务介绍

本地部署采用docker-compose的形式,能够方便地进行配置和管理,集合了zookeeper, kafka, kafka-ui,各自对应的功能介绍如下:

  • Zookeeper
    用来管理kafka元数据,以及完成生产者,消费者的协调,提供高可用。

  • Kafka
    也被称为kafka broker , 是具体的kafka实例,提供消息的存储和生产、消费服务。

  • Kafka-ui
    一个网页端的用户界面,可以用来管理消息主题,查看消息等操作。

部署配置

实际的docker部署配置如下

version: "3"
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    # network_mode: "bridge"
    container_name: zookeeper
    ports:
      - "2181:2181"
    volumes:
      - /d/docker-compose/kafka/zookeeper:/bitnami/zookeeper #持久化数据
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - kafka
  kafka:
    restart: always
    image: docker.io/bitnami/kafka:3.4
    # network_mode: "bridge"
    container_name: kafka
    ports:
      - "9004:9004"
    volumes:
      - /d/docker-compose/kafka/kafka:/bitnami/kafka #持久化数据    
    environment:       
      - TZ=Asia/Shanghai 
      - KAFKA_BROKER_ID=1       
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9004      
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9004 #替换成你自己的IP      
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes     
    depends_on:       
      - zookeeper
    networks:
      - kafka
  kafka-ui:
    restart: always
    image: provectuslabs/kafka-ui:latest
    # network_mode: "bridge"
    container_name: kafka-ui
    ports:
      - 9001:8080
    volumes:
      - /d/docker-compose/kafka/ui-kafka/etc/localtime:/etc/localtime 
    environment:
      # 集群名称
      - KAFKA_CLUSTERS_0_NAME=local
      # 集群地址
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9004 #替换成你自己的IP
    networks:
      - kafka
networks:
  kafka:
    driver: bridge

启动命令为docker-compose up -d
启动后如下图所示:

image.png

访问127.0.0.1:9001即可访问kafka-ui,实际的界面如下所示:


image.png

实例和常用配置

如上即启动了对应的kafka服务,可以通过代码实现生产者和消费者的交互过程。

生产和消费示例

1. 生产者

package com.demo.test;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;

import java.util.Date;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


@Slf4j
public class MyProducer {
    private static final String TOPIC_NAME = "com.demo.kafka.test.v1";

        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            long events = 30;
            Random rnd = new Random();

            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9004");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("message.timeout.ms", "3000");

            log.info("start ...");
            Producer<String, String> producer = new KafkaProducer<>(props);


            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String ip = "192.168.2." + rnd.nextInt(255);
                String msg = runtime + ",www.example.com," + ip;

                ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC_NAME, ip, msg);
                producer.send(data,
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if(e != null) {
                                    e.printStackTrace();
                                } else {
                                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                                }
                            }
                        }).get(10, TimeUnit.SECONDS);
            }
            System.out.println("send message done");
            producer.close();
            System.exit(-1);
    }
}

2. 消费者

package com.demo.test;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

public class MyConsumer {
    private static final String TOPIC_NAME = "com.demo.kafka.test.v1";
        public static void main(String[] args) {
            Properties props = new Properties();

            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9004");
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
            props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
            props.put("auto.offset.reset", "latest");

            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(TOPIC_NAME));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }

注意:由于是在容器内,对应的kafka名称设置成了kafka,实际执行中是验证了host的,所以需要在本地的hosts中添加一条 127.0.0.1 kafka

3. 相关依赖和版本

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <logback.version>1.2.3</logback.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.0.0</version>
        <scope>compile</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.13</artifactId>
        <version>3.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

生产者日志

部分日志

14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.5.0
14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c97b88d5db4de28d
14:49:16.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1714978156006
14:49:16.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.ClientUtils - Resolved host 127.0.0.1 as 127.0.0.1
14:49:16.009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 127.0.0.1:9004 (id: -1 rack: null) using address /127.0.0.1
14:49:16.010 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started
14:49:16.017 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
14:49:16.183 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
14:49:16.183 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
14:49:16.200 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.5.0')
14:49:16.226 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=0, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=9), ApiVersion(apiKey=1, minVersion=0, maxVersion=13), ApiVersion(apiKey=2, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=8), ApiVersion(apiKey=9, minVersion=0, maxVersion=8), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=3), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=2), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=0), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=67, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false)
14:49:16.267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 3 [usable: 3], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], AlterPartition(56): 0 to 2 [usable: 2], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 [usable: 0], DescribeProducers(61): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): UNSUPPORTED).
14:49:16.268 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='com.demo.kafka.test.v1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 127.0.0.1:9004 (id: -1 rack: null)
14:49:16.269 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='com.demo.kafka.test.v1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false14:49:16.314 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=3, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=44, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 44
14:49:16.332 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.335 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=45, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 45
14:49:16.347 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=5, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.351 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=5, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=46, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 46
14:49:16.363 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=124]}
14:49:16.366 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=47, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 47
14:49:16.379 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7, headerVersion=2) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[com.demo.kafka.test.v1-0=122]}
14:49:16.382 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='com.demo.kafka.test.v1', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=48, logAppendTimeMs=-1, logStartOffset=44, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
The offset of the record we just sent is: 48
send message done
14:49:16.383 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
14:49:16.383 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
14:49:16.387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
14:49:16.388 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
14:49:16.388 [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
14:49:16.388 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed

消费者日志

部分日志

14:52:05.505 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-kafeidou_group-1, correlationId=27, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=10000, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=722495926, sessionEpoch=4, topics=[], forgottenTopicsData=[], rackId='')
14:52:10.489 [kafka-coordinator-heartbeat-thread | kafeidou_group] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending Heartbeat request with generation 11 and member id consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93 to coordinator kafka:9004 (id: 2147483646 rack: null)
14:52:10.489 [kafka-coordinator-heartbeat-thread | kafeidou_group] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafeidou_group-1, correlationId=28, headerVersion=2) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='kafeidou_group', generationId=11, memberId='consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93', groupInstanceId=null)
14:52:10.489 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending asynchronous auto-commit of offsets {com.demo.kafka.test.v1-0=OffsetAndMetadata{offset=49, leaderEpoch=null, metadata=''}}
14:52:10.490 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-kafeidou_group-1, correlationId=29, headerVersion=2) and timeout 30000 to node 2147483646: OffsetCommitRequestData(groupId='kafeidou_group', generationId=11, memberId='consumer-kafeidou_group-1-cb4c509f-53ab-4d6e-892d-30c20e59bd93', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='com.demo.kafka.test.v1', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=49, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
14:52:10.491 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafeidou_group-1, correlationId=28, headerVersion=2): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
14:52:10.491 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received successful Heartbeat response
14:52:10.493 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-kafeidou_group-1, correlationId=29, headerVersion=2): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='com.demo.kafka.test.v1', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
14:52:10.494 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Committed offset 49 for partition com.demo.kafka.test.v1-0
14:52:10.494 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Completed asynchronous auto-commit of offsets {com.demo.kafka.test.v1-0=OffsetAndMetadata{offset=49, leaderEpoch=null, metadata=''}}



14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Removing pending request for node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Added READ_UNCOMMITTED fetch request for partition com.demo.kafka.test.v1-0 at position FetchPosition{offset=54, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[kafka:9004 (id: 1 rack: null)], epoch=0}} to node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Built incremental fetch (sessionId=722495926, epoch=22) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(com.demo.kafka.test.v1-0), toForget=(), toReplace=(), implied=(), canUseTopicIds=True) to broker kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Adding pending request for node kafka:9004 (id: 1 rack: null)
14:54:20.372 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-kafeidou_group-1, groupId=kafeidou_group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-kafeidou_group-1, correlationId=97, headerVersion=2) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=10000, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=722495926, sessionEpoch=22, topics=[FetchTopic(topic='com.demo.kafka.test.v1', topicId=gtXOJl3PQZyfmZsuZxQdWQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=54, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
offset = 53, key = 192.168.2.41, value = 1714978460356,www.example.com,192.168.2.41

常见概念和配置

  • FETCH request
    FETCH 表示从消费者拉取消息的请求,这里是定时拉取,拉取的间隔可以通过如下的配置进行修改。
fetch.max.wait.ms   间隔的毫秒数
  • OFFSET_COMMIT request
    该请求是消费者告知当前消费的消息偏移量,间隔可以通过如下的配置修改。
auto.commit.interval.ms
  • HEARTBEAT request
    该请求保证消费者和消费组的协调,在超时后则表示消费者连接过期,探活间隔可以使用如下的配置修改。
heartbeat.interval.ms
  • Auto commit

是否在消费端自动提交offset, 设置为开启后不用手动去确认消费,如果关闭自动提交,且没有手动确认消费,则会导致消息的consumer lag 增加。 可以在如下的配置项中修改。

enable.auto.commit   false/true
image.png

参考资料

  1. Kafka消费者源码:重平衡(4)-HEARTBEAT与小结
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容