kafka基础知识

github地址:https://github.com/douzixiansheng/MQ/blob/master/kafka_basic.md

Kafka

Kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力.(pub-sub模型)

维基百科

  • Kafka 是由Apache软件基金会开发的一个开源流处理平台,由Scala和JAVA编写.该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质是一个"按照分布式事务日志架构的大规模发布/订阅消息队列".Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams ———— 一个Java流式处理库.

基于kafka-zookeeper 的分布式消息队列系统总体架构如下:

    • kafka-zookeeper.png

Kafka 架构说明

  • 一个典型的Kafka集群包含若干Producer,若干Broker,若干Consumer,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举Leader,以及在Consumer Group发送变化时进行Rebalance(负载均衡)。Producer 使用push(推)模式将消息发布到Broker;Consumer 使用pull(拉)模式从Broker订阅并消费消息。

Kafka 四大核心

  • 生产者API:允许应用程序发布记录流至一个或多个kafka的主题(Topics)
  • 消费者API:允许应用程序订阅一个或多个主题,并处理这些主题接收到的记录流
  • Streams API: 允许应用程序充当流处理器(stream processor),从一个或多个主题获取输入流,并生产一个输出流至一个或多个的主题,能够有效地变换输入流为输出流
  • Connector API: 允许构建和运行可重用的生产者或消费者,能够把kafka主题连接到现有的应用程序或数据系统

Kafka 基础概念

  • 无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性

  • Producer

    • 发送消息者称为 Producer
  • Producer 采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(partition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)

  • 生产者组件图

    • producer_base.png
  • 创建Kafka生产者
    • 要往Kafka写入信息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选属性
参数 描述
bootstrap.servers 该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker的信息。(建议提供两个broker信息,一旦其中一个宕机,生产者仍然能够连接到集群上)
key.serializer broker 希望接受到的消息的键和值都是字节数组.生产者接口允许使用参数化类型,因此可以把java对象作为键和值发送给broker。
value.serializer value.serializer 指定的类会将值序列化。如果键和值都是字符串,可以使用key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器.

生产者的配置

参数 描述
acks acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消费写入是成功的。acks = 0 生产者在成功写入消息之前不会等待任何来自服务器的响应。(缺点:无法确认消费是否成功;优点:高吞吐量);acks = 1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消费无法到达首领节点(比如首领节点奔溃,新的首领还没有被选举处理),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。acks = all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory 该参数用来设置生产者缓冲区的大小,生产者用它缓冲要发送到服务器的消息。0.9.0.0 版本被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间
compression.type 默认情况下为none,消费发送时不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。1. snappy 压缩算法有Google发明,它占用较少的CPU,却能提供较好的性能和相当可观的压缩比(比较关注性能和网路带宽) 2. gzip 压缩算法一般会占用较多的CPU,但会提供更高的压缩比(网络带宽有限次采用)
retries 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这中情况下,retries参数是值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过retry.backoff.ms参数来改变这个时间间隔.
batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该采纳数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。1. 批次设置很大 不会造成延迟,只会占用更多的内存 2. 批次设置很小 因为生产者需要更频繁地发送消息,会增加一些额外的开销
linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
client.id 该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间
max.block.ms 该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常
max.request.size 该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,可以指单个请求里所有消息总的大小。
  • 生产者写入流程
    • producer_pull.png

Consumer

    • 消息接收者称为Consumer
    • consumer 采用pull(拉)模式从broker中读取数据
    • push(推) 模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息
    • 对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费, 同时还能选择不同的提交方式从而实现不同的传输语义
    • pull 模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在拉请求中有参数,允许消费者请求在的等待数据到达的"长轮询"中进行阻塞(并且可选地等待到给定的字节数,以确保打的传输大小)
  • Consumer Group (CG)
    • 这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(概念上的复制)到所有的CG,但每个partition只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic
    • 每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。

Broker(代理)

    • 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理。
  • 主题(Topic)

    • Kafka将消息以topic为单位进行归纳(一条消息必须属于某一个主题)
    • 在Kafka集群中,可以有无数的主题
    • Kafka 的主题始终是支持多用户订阅的;也就是说,一个主题可以有零个,一个或多个消费者订阅写入的
      数据
    • 分区数(Partitions): 控制topic将分片成多少log。可以显示指定,如果不指定则会使用broker(server.properties)中的num.partitions配置的数量
    • replication-factor副本:控制消息保证在几个broker(服务器)上,一般情况下等于broker的个数。

分区(Partitions)

    • 消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成
    • partitions_offset.png
    • 每个Topic都有一个或者多个Partitions 构成
    • 每个Partition都是有序且不可变的消息队列
    • Topic的Partition数量可以在创建时配置
    • Partition数量决定了每个Consumer group中并发消费者的最大数量
    • 分区的原因:
      1. 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
      1. 可以提高并发,因为可以以Partition为单位读写
    • 分区的原则:
      1. 指定了partition,则直接使用
      1. 未指定partition但指定key,通过对key的value进行hash出一个partition
      1. partition和key都未指定,使用轮询选出一个partition

偏移量(offset)

    • 任何发布到partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset是一个long型数字,它唯一标记一条消息。消费者通过(offset、partition、topic)跟踪记录.

副本

  • 副本因子操作的单位是以分区为单位,每个分区都有各自的主副本和从副本
  • 主副本叫做leader,从副本叫做follower,处于同步状态的副本叫做in-sync
  • 导致副本同步失败的原因:
    • 网络拥塞导致复制变慢
    • broker 发生奔溃导致复制滞后
  • 持续的副本:持续请求得到最新消息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领

日志

  • Kafka 附带了一个叫DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。

  • Segment 是kafka文件存储的最小单位

  • 日志存放目录 kafka_2.10-0.10.2.1/config/server.properties

    kafka_log.png



进入到日志目录查看

kafka_tree.png



在kafka的文件存储中,同一个topic下有多个不同的Partition,每个partition都为一个目录,而每一个目录又被平均分配成多个大小相等的Segment File中,Segment File 包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。

  • ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/__consumer_offsets-0/00000000000000000000.log --print-data-log
Starting offset: 12
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1560145317620 size: 136 magic: 2 compresscodec: NONE crc: 692425981 isvalid: true
| offset: 12 CreateTime: 1560145317620 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208305"}}
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 136 CreateTime: 1560237884593 size: 136 magic: 2 compresscodec: NONE crc: 2666499378 isvalid: true
| offset: 13 CreateTime: 1560237884593 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208631"}}
baseOffset: 14 lastOffset: 14 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 272 CreateTime: 1560323094331 size: 136 magic: 2 compresscodec: NONE crc: 926590729 isvalid: true
| offset: 14 CreateTime: 1560323094331 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208305"}}
baseOffset: 15 lastOffset: 15 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 408 CreateTime: 1560421275824 size: 136 magic: 2 compresscodec: NONE crc: 3081157511 isvalid: true
| offset: 15 CreateTime: 1560421275824 keysize: -1 valuesize: 66 sequence: -1 headerKeys: [] payload: {"path":"/matchingHandler.userCancel","query":{"uid":"200208300"}}
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 544 CreateTime: 1560426875868 size: 136 magic: 2 compresscodec: NONE crc: 101509911 isvalid: true

Segment文件命名规则:Partition全局的第一个Segment从0开始,后续每个Segment文件名为上一个Segment文件最后一条消息的offset值。数值最大为64位long型,19位数字字符长度,没有数字用0填充。

关键字 解释
offset 消息在partition中的绝对offset。能表示这是partition的第多少条消息
message message大小
CRC32 用crc32校验message
magic 表示本次发布kafka服务程序协议版本号
attributes 表示为独立版本、或标识压缩类型、或编码类型
key length 表示key的长度,当key为-1时,K byte key字段不填
key 可选
value bytes payload 实际消息数据

index 文件的存储方式

  • index 文件是二进制存储的,每条索引都记录了消息的相对offset和在文件中的物理位置。这里的相对offset和log文件里的offset不同,相对offset是每个segment都从1开始的,而绝对offset在整个partition中都是唯一的。

分段策略

属性名 含义 默认值
log.roll.{hours,ms} 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment 168(7 day)
log.segment.bytes 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment 1G(-1 为不限制)
log.retention.check.interval.ms 日志片段文件检查的周期时间 60000

日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据一定策略定期一批一批写入到日志文件中去,以提高吞吐量.

属性名 含义 默认值
log.flush.interval.messages 消息达到多少条时将数据写入到日志文件 10000
log.flush.interval.ms 当达到该时间时,强制执行一次flush null
log.flush.shceduler.interval.ms 周期性检查,是否需要将信息flush 暂时没有找到

集群成员关系

  • Kafka 使用Zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在broker启动的时候,它通过创建临时节点把自己的ID注册到Zookeeper。Kafka组件订阅Zookeeper的 /brokers/ids 路径(broker在Zookeeper上的注册路径),当有broker加入集群或退出集群时,这些组件就可以获得通知。在broker停机、出现网络分区或长时间垃圾回收停顿时,broker会从Zookeeper上断开连接,此时broker在启动时创建的临时节点会自动从Zookeeper上移除。监听broker列表的Kafka组件会被告知该broker已移除。在关闭broker时,它对应的节点也会消失,不过它的ID会继续存在于其他数据结构中

处理请求

  • broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。Kafka提供一个二进制协议(基于TCP),指定了请求信息的格式以及broker如何对请求做出响应——包括成功处理请求或在处理请求过程中遇到错误。
  • 客户端发起连接并发送请求,broker处理请求并作出响应。broker按照请求到达的顺序来处理它们——这种顺序保证让Kafka具有了消息队列的特性,同时保证保存的消息也是有序的。
  • 标志消息头
参数 描述
Request type API key
Request version broker可以处理不同版本的客户端请求,并根据客户端版本做出不同的响应
Correlation ID 一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
Client ID 用于标识发送请求的客户端
  • broker会在它所监听的每一个端口上运行一个Acceptor线程,这个线程会创建一个连接,并把它交给Processor线程去处理。Processor线程("网络线程")的数量是可以配置的 。网络线程负责从客户端获取请求信息,把它们放进请求队列,然后从响应队列获取响应信息,把它们发送给客户端。
  • 客户端如何知道往哪里发送请求
    • 客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个broker,因为所有broker都缓存了这些信息。

控制器

  • 控制器其实就是一个broker。集群里第一个启动的broker通过在Zookeeper里创建一个临时节点 /controller 让自己成为控制器。其他broker在启动时也会尝试创建这个节点,不过它们会收到一个"节点已存在"的异常,然后"意识"到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建Zookeeper watch对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
  • 如果控制器被关闭或者Zookeeper断开连接,Zookeeper上的临时节点就会消失。集群里的其他broker通过watch对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper里成功成功创建控制器节点的broker就会成为新的控制器,其他节点会收到"节点已存在"的异常,然后在新的控制器节点上再次创建watch对象。每个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的、数值更大的controller epoch。其他broker在知道当前controller epoch后,如果收到有控制器发出的包含旧epoch的消息,就会忽略它们。
  • 当控制器发现一个broker已经离开集群(通过观察相关的Zookeeper路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好在这个broker上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker发送请求。该请求消息包含了谁是新首领已经谁是分区跟随者的信息。随后新首领开始处理来着生产者和消费者的请求,而跟随者开始从新首领那里复制消息。
  • 当控制器发现一个broker加入集群时,它会使用broker ID来检查新加入的broker是否包含现有的分区副本。如果有,控制器就把变更通知发送给新加入的broker和其他broker,新broker上的副本开始从首领那里复制消息。

Kafka 消费过程分析

    • Kafka提供了两套consumer API:高级Consumer API 和 低级 Consumer API
    • 高级API 优点
      1. 高级API写起来简单
      1. 不需要自行去管理offset,系统通过zookeeper自行管理
      1. 不需要管理分区,副本等情况,系统自动管理
      1. 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)
      1. 可以使用group来区分对同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
    • 高级API 缺点
      1. 不能自行控制offset(对于某些特殊需求)
      1. 不能细化控制,如分区、副本、zk等
    • 低级API
    • 低级API优点
      1. 能够让开发者自己控制offset,想从哪里读取就从哪里读取
      1. 自行控制连接分区,对分区自定义进行负载均衡
      1. 对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存储在文件或则内存中)
    • 低级API缺点
    • 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader等

kafka复制原理

  • Kafka的复制机制和分区的多副本架构是Kafka可靠性保证的核心。把消息写入多个副本可以是Kafka在发送奔溃时仍能保证消息的持久性。

  • 消费的发送方式:主题\value、主题\key\value、主题\分区\key\value、主题\分区\时间戳
    \key\value

  • Kafka 中topic的每个partition有一个预写式的日志文件,虽然partition可以继续细分为若干个segment文件,但是对于上层应用来说可以将partition看成最小的存储单元,每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中。

    • LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置
    • HW: 是HighWatermark的缩写,是指consumer能够看到的此partition的位置
    • 具体描述:Kafka每个topic的partition有N个副本(replicas).
    • kafka 通过多副本机制实现故障自动转移,当kafka集群中一个broker失效情况下仍然保证服务可用。kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replicas为leader,其他都为follower,leader处理partition的所有读写请求,于此同时,follower会被动定期地去复制leader的数据。kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并接受客户端的消息成功写入。
    • leader负责维护和跟踪ISR中所有follower滞后的状态.
    • 当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower"落后"太多或者失效,leader将会把它从ISR中删除.
  • leader 将某个follower提出ISR列表的情况:

      1. 按数量——如果leader当前的offset已经到10,但是某个follower同步的数据还是2,但是kafka对于数量的偏差设置为6。如果当前偏差小于等于设置的偏差,那么会将该follower提出ISR列表,进入到OSR列表[所有的副本数据 = ISR + OSR]
      2. 按时间——有新数据,多久没有发送确认信息
    • hwpng.png

ISR(副本同步队列)

    • ISR 是所有副本的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟,包括延迟时间 replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度,当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度)。任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
    • leader 新写入的信息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部的broker的读取请求,没有HW的限制。
    • 同步复制要求所有的能工作的follower都复制完,这条消息才会被commit,这种复制方式是否极大的影响了吞吐率?
    • 异步复制方式
    • follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失已经吞吐率。
    • Kafka的管理最终都会反馈到Zookeeper节点上。
    • 具体位置:/brokers/topics/[topic]/partitions/[partition]/state.
    • 目前有两个地方会对这个Zookeeper的节点进行维护:
      1. Controller维护:Controller 下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
      2. leader维护:leader有单独的线程定期检测ISR中follower是否脱离ISR,如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。
    • Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。
  • kafka 数据可靠性

    • 数据丢失的可能:可以采用callback的方式进行处理,判断异常信息是否为空,如果为空表示正常发送了,否则就有异常,可进行特殊处理
    • 当producer向leader发送数据时,可以通过acks参数来设置数据可靠性的级别:
      1. 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
      1. 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性是最低的
      1. all:leader需要等待所有备份都写入日志,这种策略会保证只要有一个备份存活就不会丢失数据,这是最强的保证。
  • kafka 消息传输保障

    • Kafka确保消息在producer和consumer之间传输。有以下三种可能的传输保障
      1. At most once : 消息可能丢失,但绝不会重复传输
      1. At least once : 消息绝不会丢,但可能重复传输
      1. Exactly once: 每条消息肯定会被传输一次且仅传输一次
  • kafka leader 和 follower 如何通信

疑问

  • 一个broker服务下,是否可以创建多个分区?
    • 可以,broker数与分区数没有关系
  • 一个broker服务下,是否可以创建多个副本因子?
    • 不可以,会报错;
      创建主题时,副本因子应该小于等于可用的broker数
      Error while executing topic command : replication factor: 3 larger than available brokers: 1
      [2019-07-23 17:34:45,963] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 1
      (kafka.admin.TopicCommand$)
      
  • 在kafka中,每一个分区会有一个编号,从0开始
  • 当执行删除命令之后,topic不是物理删除,而是一个标记删除的操作.
  • 标记删除之后的主题是否还可以继续生产数据?
    • 不会有影响
  • 如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)
    • 让主题下只有一个分区
  • 某一个主题下的分区数,对于消费组来说,应该小于等于该主题下的分区数。
  • 在使用kafka的过程中,如何保证数据的不丢失,不重复的问题?
  • 如何确保Producer不丢失数据?
  • ACK (应答机制设置为2)
  • Kafka 的用途?使用场景?
    • 消息系统;实时监控或者离线处理;日志收集
    • 异常处理、日常系统削峰、解耦、提速、广播
  • Kafka中的ISR、AR代表什么?ISR的伸缩?
    • ISR: In-Sync Replicas 副本同步队列
    • AR: Assigned Replicas 所有副本
    • ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms 和 延迟条数 replica.lag.max.message两个维度,当前最新的版本0.10.x 中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会存放在OSR中。AR=ISR+OSR
  • Kafka中的HW、LEO、LSO、LW等分别代表什么?
    • HW: High Watermark 高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息
    • LEO: LogEndOffset 当然日志文件中下一条代写信息的offset
    • HW/LEO 这两个都是指最后一条的下一条的位置而不是最后一条的位置
    • LSO: Last Stable Offset 对未完成的事务而言,LSO的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同HW相同
    • LW: Low Watermark 低水位,代表AR集合中最小的logStartOffset值
  • Kafka 中是怎么体现消息顺序性的?
    • Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,保证了消费时也是有序的
    • 整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1
  • Kafka中的分区器、序列化器、拦截器之间的处理顺序是什么?
    • 拦截器 -> 序列化器 -> 分区器
  • Kafka 生产者客户端中使用了几个线程来处理?
    • 2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到Kafka中
  • 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
    • offset + 1
  • 造成重复消费的原因:
    • 消费者消费后没有commit offset(程序奔溃/强行kill/消费耗时/自动提交偏移情况下unscrible)
  • 造成消息漏消费的原因:
    • 消费者没有处理完消息,提交offset(自动提交偏移,未处理情况下程序异常结束)
  • KafkaConsumer 是非线程安全的,如何实现多线程消费
      1. 在每个线程中创建一个KafkaConsumer
      1. 单线程创建KafkaConsumer,多个处理线程处理消息
  • 消费者与消费组之间的关系
    • 消费者从属于消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费
  • 使用kafka-topics.sh 创建(删除)了一个topic之后,kafka背后执行了什么逻辑
    • 创建:在zk上 /brokers/topics/下节点 Kafka broker 会监听节点变化创建主题
    • 删除: 调用脚本删除topic会在zk上将topic设置待删除标志,kafka后台有定时线程会扫描所有需要删除的topic进行删除
  • 创建topic时如何选择合适的分区数
    • 根据集群的机器数量和需要的吞吐量来决定适合的分区数
  • Kafka 目前有哪些内部topic,特征,作用
    • __consumer_offsets 保证消费组的偏移
  • 优先副本是什么?有什么特殊作用
    • 默认的leader副本
    • 发送leader变化时重选举会优先选择优先副本作为leader
  • Kafka的Log Retention的理解
    • kafka 留存策略包括删除和压缩两种
    • 删除:根据时间和大小两种方式进行删除,大小是整个partition日志文件的大小,超过的会从老到新依次删除;时间指定日志文件中最大时间戳而非文件的最后修改时间
    • 压缩:相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续

持续更新...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容