发布订阅模式:
发布订阅模式:发布者将消息投递到Topic中,数据在持久化在Topic中,只有订阅了相应Topic的消费者才可以消费这个消息,一个Topic可以允许多个消费者订阅,一个消费者可以订阅多个Topic,所以Topic可以被所有订阅者消费,而被消费掉后不会立即删除,会保留历史消息。本文的主角kafka用的就是发布订阅模式
而kafka就是一个流处理平台,是一个高效和实时具有发布订阅模式、分布式的、多副本的消息系统,kafka具有横行扩展,高容错,高性能的特点
kafka的特性:
高容错:多分区多副本增强了容错和可扩展性,多订阅者支持将消息广播到多个订阅者,支持zookeeper调度。
高性能、高吞吐量:kafka吞吐量非常的大,每秒可以处理和产生几十万条消息,延迟能控制在毫秒级,超高性能的特性,使kafka能够很好的应对高并发的应用场景。
持久性和扩展性:数据可持久化的特点使kafka有别于其他的消息队列,多副本和基于组(consumer group)的消费使得kafka拥有不错的容错性,基于组的消费模式使得kafka能很好的支持水平扩展。
topic、partition、offset、comsumer group、replication之间的关系傻傻分不清:
kafka的消息以分门别类的形式,分成了很多个Topic,每个Topic又分成了很多个partition,而topi只是一个逻辑概念消息真正存储在这些物理层面的partition中,partition又可均匀的分布到集群的各个broker(kafka实例)中,生产或者消费的时候会被路由到相应的partition。
在partition中的消息会被标识上递增系列号,代表着先后进入partition 中存储的顺序,这个序列号就是offset,消费者通过offset指定从哪里开始进行消费,生产者生成消息时,消息会通过应用的路由策略将数据加到partition的末尾,消息被消费后不会马上删除,而是记录在日志中保存一定的期限,默认是7天。消费者可以通过重置offset来重新消费。
topic可能会非常大,所以topic可以通过partition将消息分成几个分区,然后将partition均匀放置到不同的broker中,这样是为了实现Kafka的broker端和消费端的负载均衡,同时提高kafka的吞吐量,不同的partition的数据都不相同。而partition又被备份为多个replication,同时replication也会被均匀分布到不同的broker中,第一个启动并在zookeeper注册的broker将被选举为kafka controller,所有的Topic中第一个启动的partition将被kafka controller授权为leader,其他的replication都会成为follower,leader负责写和读,消费者和生产者的读写只会找leader,follower只负责从leader pull数据,通过ISR实现数据同步,当leader故障时kafka controller就会选择一个follower当leader,这种多分区多副本的机制让kafka拥有了容错性、横行扩展性和负载性
通常kafka的消费者是以分组comsumer group的形式,一个组的消费者的数量要小于等于topic中的partition的数量(不然就会有消费者是处于空闲状态),不允许一个partition被同一个组的多个comsumer消费,一个分组可以订阅多个topic,topic内的partition可以给多个组消费,但是一个组内的只能由一个consumer消费同一个topic的同一个partition,也就是说在一个组内partition只能对应一个consumer,而一个consumer可以对应多个partition,而一个partition可以对应多个组,topic可以对应多个组,partition可以把同一条消息发送到不同的分组去,而consumer可以消费多个partition,一个partition可以被多个小组消费但是只能给各个小组内的一个consumer消费,组内不允许多个consumer消费同一个partition。
kafka默认通过range(范围)策略对消费者进行分区的分配,而生产者默认使用Round-robin(轮询)策略将消息发送到不同的分区,这样达到了分区的负载均衡的目的。生产者在发送消息失败时是可以自动重试的(之前面试时一个弱智面试官说kafka没有重试机制,就把我pass掉了,我现在想一锤子敲死他,不会就不要祸害我啊),可以通过retries参数设置,重试时间间隔默认100ms,所以我们没必要在代码中处理可以重试的错误,只需要处理失败上限后的不可重试的错误,可以记录入库
分组分区的模式让kafka保证了消息消费的有序性和负载均衡,但是这个有序性只能是一个partition能的消息有序,而多个partition之间是不能保证有序性的,想要topic所有的消息有序,只能让使topic只有一个partition
说了那么多也不知道记不记得住,那个给它分个类分别介绍一下:
topic:消息主题,kafka将消息分门别类形成了不同的主题,一个kafka实例可以创建多个topic,一个topic可以存保存在多个kafka实例中,生产者投递消息或是消费者消费消息,只需要关心将选择哪个topic,而不需要关心topic存放在何处
partition:topic的分区,一个topic可以分成多个partition,topic中至少要有一个partition,partition中保存着topic的订阅消息,partition可以存放在各个不同的broker中,一个partition只能对应一个分组的一个消费者,partition可以提供不同的分组的不同消费者消费,partition中保存的数据是有序的,但是当topic存在多个partition时,topic将不能保证数据的顺序,所以在需要保证数据顺序时,需要将partition设置为1个。
replication:partition的副本,副本保存在不同的broker,保证了一个节点挂了,不影响集群的高可用效果,partition有leader和follower之分,每个分区只有一个是partition leader其余副本都作为follower,当partition leader挂掉时,kafka controller将选择一个follower代替,副本存放在不同的broker中,Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica,所有Partition的Replica默认情况会均匀分布到所有Broker上 。partition leader负责提供给消费者和生产者读写功能,partition follower负责从leader中pull数据,进行数据的备份,当集群中某个节点挂掉了,其他节点上的副本就会顶上,这一机制使得kafka增加了容错性和可扩展性
offset:offset记录消了费者在partition中消费到哪里,offset原本存放于zooker中,但是zooker不适合大批量的频繁写入操作,所以将offset移动到了一个broker中名叫__consumer_offsets topic的topic中,当kafka在被消费的过程中即使挂了,可以通过offset恢复数据,它就像是书的目录能快速找到该从哪里开始看
comsumer group:comsumer group 是一个kafka集群中的一个comsumer小组,它共享一个group.id,组内协调小组的comsumer成员共同消费小组中被订阅的topic的所有partition,组内不允许多个comsumer共同消费一个partition,但是允许一个comsumer消费多个partition,由于comsumer group的变动对kafka集群的影响很小
kafka的持久化:topic的被分成多个partition,每个partition在物理层面上对应的是一个文件夹,文件夹下存储的partition所有的消息的日志文件和索引文件,任何发布到partition的消息都会记录到这个日志文件中末尾,使得kafka拥有了数据持久化的能力
offset的控制:在比较老的kafka版本中,offset是存放在zookeeper中的,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,在消费失败时可以重置offset达到重新消费的效果
kafka数据丢失和重复消费:
生产者的数据丢失:
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。
Kafka 消息发送分同步 (sync)、异步 (async) 两种方式,默认使用同步方式,可通过 producer.type 属性进行配置;
通过 request.required.acks 属性进行配置:值可设为 0, 1, -1
0 :相当于异步发送,消息发送完毕即offset增加,继续生产;
1: leader对消息落盘成功后返回 ack 才增加offset,然后生产者才继续生产;
-1:leader收到所有replica 对一个消息的接受ack才增加offset,然后生产者才继续生产;
生产者发送时数据丢失的情形:
1、acks=0,不和kafka集群进行消息接收的确认,那么当网络异常、缓冲区满了等情况时,就会出现数据丢失;
2、acks=1,同步模式下,在只有leader接收成功并确认后,leader挂了,副本没有同步完成,就会造成数据丢失;
消费者消费数据的丢失和重复:
消费者消费消息数据丢失的情景:
设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
消费者消费的数据丢失解决办法:
通过用手动提交来保证数据的不丢失
数据重复的原因:
消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间,此时有一定几率offset没提交,会导致重复消费。
解决办法:
可以通过将每次消费的数据的唯一标识存入Redis中,每次消费前先判断该条消息是否在Redis中,如果有则不再消费,如果没有再消费,消费完再将该条记录的唯一标识存入Redis中,并设置失效时间,防止Redis数据过多、垃圾数据问题。
ISR
kafka动态维护了一个同步状态的副本的集合,简称ISR。ISR是Kafka用来保证数据可靠性的机制,即保证每个分区都收到生产者生产的消息。Kafka默认的副本同步策略是全部同步完成再返回ack。这样的效率是很慢的。于是提出了ISR机制。
同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
kafka不是完全同步,也不是完全异步,是一种ISR机制: 每个Partition的leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护,当ISR中所有Replica都向Leader发送ACK时,leader才commit,如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
既然所有Replica都向Leader发送ACK时,leader才commit,那么flower怎么会leader落后太多?
producer往kafka中发送数据,不仅可以一次发送一条数据,还可以发送message的数组;批量发送,同步的时候批量发送,异步的时候本身就是就是批量;底层会有队列缓存起来,批量发送,对应broker而言,就会收到很多数据(假设1000),这时候leader发现自己有1000条数据,flower只有500条数据,落后了500条数据,就把它从ISR中移除出去,这时候发现其他的flower与他的差距都很小,就等待,主要是因为内存,服务器性能等原因,差距很大,就把它从ISR中移除出去。
也就是说isr中机器的速度存在差异,当积少成多,就会把太落后的剔除出去
follower移除:follower如果落后leader太多,就把它从ISR中移除出去,如果发现follower与他的差距都很小,就等待;如果因为内存等原因,差距很大,就把它从ISR中移除出去。
leader移除:如果是leader挂掉了,从它的follower中选举一个作为leader,并把挂掉的leader从ISR中移除,继续处理数据。一段时间后该leader重新启动了,它知道它之前的数据到哪里了,尝试获取它挂掉后leader处理的数据,获取完成后它就加入了ISR。
ISR选举:当leader挂了之后broker controller会从isr中选择一个副本当leader,在isr集合中,broker controller会选择isr序号最靠前的broker上的副本当leader,原因在于序号越靠前说明节点机子的性能越好,原因是每次启动所有的broker 会向zookeeper注册节点,越先注册,序号越靠前,默认代表机器性能越好,但是这提前是副本必须是在isr集合中没有被剔除
配置:
server配置:
rerplica.lag.time.max.ms=10000 # 如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题 # 或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除。
rerplica.lag.max.messages=4000 # 相差4000条就移除 # flower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中, # 在可用性与一致性做了动态平衡
topic配置:
min.insync.replicas=1 # 需要保证ISR中至少有多少个replica
Producer配置:
request.required.asks=0
# 0:相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,
那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步),
既有可能丢失也可能会重发
# 1:当leader接收到消息之后发送ack,丢会重发,丢的概率很小
# -1:当所有的follower都同步消息成功后发送ack. 丢失消息可能性比较低
消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull(拉)模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
消费者分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。
Kafka有两种分配策略,一是roundrobin(轮询),一是range(范围)。
示例:0、1、2、3、4、5、6、 7、8、 9 、总共十个分区,四个consumer
采取RoundRobin策略:
即轮询。比如一个topic下有10个分区,那么第一个分区0被分配给C0,第二个分区1被分配给C1,第三个分区2被分配给C2,第四个分区3被分配给C3,以此类推。
C0 : 0、4、8
C1: 1、5、9
C2: 2、6
C3: 3、7
- 好处,同一个消费者组的不同消费者之间所消费的分区数量相差最大不会超过1。
- 缺点,就是一个消费者组中的消费者各自指定消费的主题并不会由这个消费者消费,而是拿出来作为整个组要消费的主题。
采取range(默认)策略:
针对每一个topic:
n = 分区数/消费者数量
假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
最后分配结果如下
C1:0,1,2,3
C2:4,5,6
C3:7,8,9
如果有11个分区将会是:
C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)
- 缺点,就是当同一个消费者组的多个消费者消费的主题相同,并且消费的相同主题比较多时,按照上面步骤(2)的情况,可能会导致消费者A消费十个分区,而消费者B只消费了五个分区,这样不平衡。
- 好处在于,同一个消费者组的不同消费者可以指定自己单独需要消费的主题。。只有当其他消费者与当前消费者指定的主题相同时,才会一起分配这个主题的不同分区。
Producer 把消息发送给对应分区策略:
https://www.cnblogs.com/lincf/p/11985026.html
生产者分区投递原则的简单介绍:
1)指明partition的情况下,直接将指明的值直接作为partition值;
2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
3)没有partition和key情况下,第一次调用时随机生成一个整数,将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
Rebalance:
Reblance本质上是一种协议,规定了一个Consumer Group下的所有的Consumer如何达成一致来分配订阅Topic的每个Partition。比如某个group下有5个consumer,它订阅了一个具有10个分区的topic。正常情况下,Kafka平均会为每个consumer分配2个分区。这个分配的过程就叫rebalance。
Rebalance的触发条件:
1.有新的消费者加入Consumer Group
2.有消费者下线,可能由于长时间未向GroupCoordinator(协调者)发送心跳,GroupCoordinator会认为其已下线
3.有消费者主动退出Consumer Group
4.订阅的topic分区出现变化
5.调用unsubscribe()取消对某Topic的订阅
即Consumer或者Topic自身发生变化时,会触发Rebalance。
Kafka实现负载均衡的原理:
通过Topic的一个partition只能给一个分组中的一个消费者消费,所以实现了一个分组中的每台服务器只能接收到一个Topic中的消息是不会重复的
场景:假设有一个分组代表着一个分布式集群,里面有5台相同的服务,也就是说这5台服务它们都订阅了相同的topic。当producer向topic中生产消息时会通过一定的策略生产到对应的partition中,一般业务只希望一条消息随机的给一台服务器消费到,然后做相应的业务处理,比如存储数据库等等。然后通过不同的partiton对应不同的服务,就实现了负载均衡,一般相同的组中都是相同的服务,所以每次只有一个服务能获取一个partiton的消息。因为每个消息只会被写到一个partiton中。这样就实现了服务的负载均衡并且不会数据重复消费的问题。
partition不止是实现了消费者端的负载,也实现了broker端的负载,由于partition一般都是均匀的分不到不同的broker物理机上,所以通过不一样的负载策略就达到了相应的broker的负载
分散分布:
kafka的partition和Replica全部都是均匀的分散在各个broker中,包括leader也分散到各个broker中避免broker热度过高,partition的分散实现了broker的负载均衡,Replica的分散实现了broker同步数据时的写压力并且可以有效避免一个broker宕机,其他的broker的Replica可以顶上,不会造成数据的丢失,对应副本和分区的放置kafka集群会做内部的平衡
zookeeper在kafka中的作用
1.记录broker registry:每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions,ISR信息。
2. 记录consumer registry:每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
3.Kakfa Broker Controller的选举:Kafka Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。
4.注册中心:zookeeper是充当了Kafka的注册中心,消息传递时, Producer在发布消息到某个Partition时是通过zookeeper找到该Partition的Leader的
kafka的选举
1、分区选举(优先副本+ISR选举法):分区leader副本的选举是由kafka controller(控制器)具体负责实施的,从分区集合中按照顺序找到第一个存活的副本,并且这个副本在ISR集合中,所以分区的选举有两个条件就副本在集合的顺序靠前,并且是处于ISR中,副本的顺序在分区时就指定好了,只要不发生重新分配的情况,集合内的顺序是不会发生改变的,符合这两个条件的,直接将优先副本设置为leader,
在Kafka中,并不是采用常见的多数选举的方式进行选举,而是kafka controller会针对每个Topic维护一个ISR集合,显然存在一些没有来得及完全同步数据的副本,ISR会剔除落后太多的副本,所以只有在这个ISR列表中才有资格成为Leader(先试用ISR集合中的第一个,如果不行则依次类推,因为ISR里面是同步的副本,消息是最完全的且各个节点都是一样的)。
因为存在ISR集合,Kafka需要的冗余度较低,可以容忍的失败数较高,即有较高的容错率。加入某个Topic中存在f + 1个副本,kafka可以容忍 f 个副本不可用。当然如果ISR集合中所有的副本都不可用,Kafka也可以选择在非ISR集合中选举可用的副本,只不过会存在数据不一致等问题。
2、控制器的选举(Kakfa Broker Controller):kafka集群中有一个或多个broker,其中的一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中的所有分区和副本的状态等工作,Kafka Controller是由Zookeeper来实现的,只要哪个broker能够最先在Zookeeper中创建controller这个临时节点,那么他就可以成为Kafka controller。
kafka的日志管理:
基于kafka的架构模式,其会将各个分区平均分配到每个broker中,也就是说每个broker会被分配用来提供一个或多个分区的日志存储服务。在broker服务器上,kafka的日志也是按照partition进行存储的,其会在指定的日志存储目录中为每个topic的partition分别创建一个目录,目录中存储的就是这些分区的日志数据,而目录的名称则会以<topic_name>-<partition_id>的格式进行创建。
比如:一个名为page_visits的top有5个partition,其目录结构为:
partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
下面引用一张旧的kafka存储机制图:
index文件和log文件的关系:xxx.index文件存储大量的索引信息,xxxx.log文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
使用offset查找message的过程
因为每一个segment文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。
举例:这里我们以查找offset为6的message为例,查找流程如下:
1、首先要确定这个offset信息在哪个segment文件(由于是顺序读写,这里使用二分查找法),第一个文件名为00000000000000000000,第二个为00000000000000150320,所以6这个offset的数据肯定在第一个文件里面;
2、找到文件这个文件的00000000000000000000.index文件中的[6,9807]定位到00000000000000000000.log文件中9807这个位置来进行数据读取即可。
kafka参数调优:
https://blog.csdn.net/qq32933432/article/details/96479411
kafka的性能高的原因:
1、通过sendfile实现零拷贝
2、每个partition 只支持顺序读写:
磁盘的顺序读写比随机读写要高几十到上百倍,甚至比内存的速度还快
3、分批发送
kafka支持指定消息堆积大小后进行批量的发送,可以通过超时时间和批量大小去控制批量的发送,降低网络开销。生产者可以将多个发送到同一个分区的信息放到同一个批次内积累起来,当批次大小填满(通过batch.size设置批次大小)或者是到达了超时时间(通过linger.ms设置超时时间)就会进行发送