原则: kafka版本!kafka broker及spring-boot配置看官网,看kafka源码, 源码, 源码 !!!
https://www.oneapm.com/ci/kafka.html
https://segmentfault.com/a/1190000016153221 (Flume + zk + kafka)
1.kafka简介
1.1 适用场景
#1.Messaging
对于一些常规的消息系统,kafka是个不错的选择;
partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.
不过到目前为止,我们应该很清楚认识到,
kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;
kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
#2.Websit activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;
可以将网页/用户操作等信息发送到kafka中,并实时监控,或者离线统计分析等
#3.Log Aggregation
kafka的特性决定它非常适合作为"日志收集中心";
application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;
kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.
此时consumer端可以使hadoop等其他系统化的存储和分析系统.
#4.Metrics
kafka通常用于业务数据监测,还可以用于统计和汇总来自分布式应用程序中产生的批量业务数据。
#5.Stream Processing
#6.Event Sourcing
事件源是应用程序设计的一种样式,可以按时间序列记录其状态变化的日志。
#7.Commit Log
kafka可以作为分布式系统的一种外部提交日志系统。
通过日志在节点之间复制数据,运用同步机制为宕机节点恢复其数据。
kafka中的日志压缩特性有助于支持这种用法。
1.1.1 为什么需要消息系统
#1.解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
#2.冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
许多消息队列所采用的”插入-获取-删除”范式中,再把一个消息从队列中删除之前,
需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
#3.扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
#4.灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。
如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
#5.可恢复性:
系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,
所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
#6.顺序保证:
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
(Kafka 保证一个 Partition 内的消息的有序性)
#7.缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
#8.异步通信:
很多时候,用户不想也不需要立即处理消息。
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1.2 什么是kafka
它是一个分布式消息系统,由linkedin使用scala编写,
用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。
具有高水平扩展和高吞吐量。
适合小数据传输。
1.3 Kafka和其他主流分布式消息系统的对比
标签 | ActiveMQ | RabbitMQ | Kafka |
---|---|---|---|
所属公司/社区 | Apache | Mozilla Public License | Apache/LinkedIn |
开发语言 | java | Erlang | java |
支持的协议 | AMQP,REST等 | AMQP | 仿AMQP |
事务 | 支持 | 不支持 | 不支持 |
集群 | 支持 | 支持 | 支持 |
负载均衡 | 支持 | 支持 | 不支持 |
动态扩容 | 不支持 | 不支持 | 支持(依托于zk) |
单机吞吐量 | 万级 | 万级 | 10万级 |
topic数量对吞吐量的影响 | - | - | topic从几十个到几百个的时候,吞吐量会大幅度下降,所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 |
时效性 | ms级 | 微秒级 | ms级 |
可用性 | 高,主从架构实现 | 高,主从架构实现 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | - | 经过参数优化配置,消息可以做到0丢失 |
1.4Kafka相关概念
1.4.1 AMQP协议 & JMS
#Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):
是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。
AMQP定义了通过网络发送的字节流的数据格式。
因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,
可以很容易做到跨语言,跨平台。
#kafka的仿AMQP协议与之的区别
kafka和JMS(Java Message Service)实现(activeMQ)不同的是:
即使消息被消费,消息仍然不会被立即删除.
日志文件将会根据broker中的配置要求,保留一定的时间之后删除;
比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.
kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.
对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,由consumer来控制;
当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.
事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值(offset将会保存在zookeeper中).
"这里不建议cousumer配置自动提交offset"
1.4.2 基本概念
#1、主题(Topic)(数据逻辑存储单元):
每条发送到broker的消息都有一个类别,这个类别称为topic,即 kafka 是面向 topic 的。
"一个topic就是一个queue, 一个队列"
#2、分区(Partition)(数据物理存储单元):
[kafka-topics.sh 工具可以动态创建删除查看更新topic, 修改partition.
只能增加partition数量, 不能减少, 除非删除重建.]
partition 是物理上的概念,每个 topic 包含一个或多个 partition。
kafka 分配的单位是 partition。
一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,
一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,
常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,
消费者从队列头订阅消息,生产者从队列尾添加消息。
Partition在服务器上的表现形式就是一个一个的文件夹,
每个partition的文件夹下面会有多组segment文件,
每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件,
log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
###"Message结构":
>> offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。
例如你想找位于2049的位置,只要找到2048.kafka的文件即可。
当然the first offset就是00000000000.kafka。
>> 消息大小:消息大小占用4byte,用于描述消息的大小。
>> 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
#3、备份(Replication)(分为leader和follower):
[kafka-reassign-partitions.sh工具可用来动态增加Replications数量.]
为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),
防止其中一个Broker宕机造成分区上的数据不可用。
Replication数量不能超过brokers数量, 否则创建topic时会报错.
#4.偏移量(offset)
kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。
比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5
#5.消费者:(Consumer):
从消息队列中请求消息的客户端应用程序
一个消费者组中的消费者数量不要超过 topic 的 partition 的数量,
否则多出的消费者将会被限制, 不去消费任何消息.
#6.生产者:(Producer) :
向broker发布消息的应用程序
#7.kafka实例(broker):
Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高。
Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。
每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
#8.Consumer group:
high-level consumer API 中,
每个 consumer 都属于一个 consumer group,
每条消息只能被 consumer group 中的一个 Consumer 消费,
但可以被多个 consumer group 消费。
kafka确保每个partition中的一条消息只能被某个consumer group中的一个consumer消费
kafka通过group coordinate管理consumer实例负责消费哪个partition, 默认支持range和round-robin消费
kafka在zk中保存了每个topic,每个partition在不同group的消费偏移量(offset), 通过更新偏移量, 保证每条消息都被消费
需要注意的是, 用多线程读消息时, 一个线程相当于一个consumer实例, 当consumer数量大于partition数量时, 有些线程读不到数据
#9.leader:
Replication中的一个角色, producer 和 consumer 只跟 leader 交互。
每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。
其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。
#10.follower:
Replication中的一个角色,从 leader 中复制数据。
#11.controller:
kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
#12.zookeeper:
kafka 通过 zookeeper 来存储集群的 meta 信息。
#13.ISR(In-Sync Replica):
是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。
由于读写都是首先落到Leader上,
所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),
任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。
#14. Segment
每个Partition包含一个或多个Segment,每个Segment包含一个数据文件和一个与之对应的索引文件。
#15.kafka支持的客户端语言:
Kafka客户端支持当前大部分主流语言,
包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信
(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)
#14."总之流程可以描述为"
应用程序A(producer)--将message按照topic分类-->push到kafka服务器集群(broker)中
应用程序B(consumer)--从kafka服务器集群(broker)中pull消息(message)
工作图如下:
#一个典型的Kafka集群中包含:
● 若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等)
● 若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高)
● 若干Consumer Group,以及一个Zookeeper集群。
Kafka通过Zookeeper管理集群配置,选举partition 的 leader,
以及在Consumer Group发生变化时进行rebalance。
#一个简单的消息发送流程如下:
● Producer根据指定的partition方法(round-robin、hash等),将消息push到对应topic的partition里面
● kafka集群(brokers)接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置), 但不关注消息是否被消费。
● Consumer从kafka集群pull数据,并控制获取消息的offset
1.4.3 kafka的动态扩容
#Kafka的动态扩容是通过Zookeeper来实现的。
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制
来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
1) Producer端使用zookeeper用来"发现"broker列表,
并与Topic下每个partition leader建立socket连接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,
其中包括consumer消费的partition列表等,同时也用来发现broker列表,
并和partition leader建立socket连接,并拉取消息.
Zookeeper是一种在分布式系统中被广泛用来作为:
分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。
kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,
进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息由zookeeper保存;
因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.
1.4.4 kafka中producer的ack机制
Kafka的ack机制,指的是producer的消息发送确认机制,与Kafka集群的吞吐量和消息可靠性密切相关。
acks有一些个可选值。
#acks=1(默认值, 字符串类型)
producer只要收到一个分区副本(leader副本)成功写入的通知就认为推送消息成功了。
只有leader副本成功写入了,producer才会认为消息发送成功。
#acks=0
producer发送一次就不再发送了,不管是否发送成功。安全性最低但是效率最高。
#ack=-1
producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。安全性最高,但是效率最低。
#ack=n
producer只有收到n个分区副本(leader副本)成功写入的通知才认为推送消息成功了。
----------------------------------------------
#spring-boot中指定acks类型
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: -1
----------------------------------------------
1.4.5 kafka的partition进行消息的write-copy-read, 以及leader选举&高可用机制
#1.producer向brokers发送消息
当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,
那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?
producer会和Topic下所有partition leader保持socket连接;
其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.
消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".
事实上,消息被路由到哪个partition上,由producer客户端决定.
可以采用"random""key-hash""round-robin"等来实现消息负载均衡.
-----------------------------------------
#producer发送消息的逻辑:
#org.apache.kafka.clients.producer.ProducerRecord源码
#org.springframework.kafka:spring-kafka:2.2.6.RELEASE中org.springframework.kafka.core.KafkaTemplate提供了发送消息的api
1.partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2.如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3.如果没有指定key值并且可用分区个数大于0时,在就可用分区中做轮询决定改消息分配到哪个partition。
4.如果没有指定key值并且没有可用分区时,在所有分区中轮询决定改消息分配到哪个partition。
----------------------------------------
#异步发送:
将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,
小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。
不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。
----------------------------------------
#kafka-0.8.2之后, producer均为异步方式(async)
----------------------------------------
#Producer均衡算法
kafka集群中的任何一个broker,都可以向producer提供metadata信息,
这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息).
当producer获取到metadata信心之后, producer将会和Topic下所有partition leader保持socket连接;
消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".
事实上,消息被路由到哪个partition上,由producer客户端决定.
比如可以采用"random""key-hash""轮询"等.
在producer端的配置文件中,开发者可以指定partition路由的方式.
#2.consumer从brokers中消费消息
"
一个消费者组中的消费者数量不要超过 topic 的 partition 的数量,
否则多出的消费者将会被限制, 不去消费任何消息.
"
>>> 问:"在kafka中, 每个 Topic 一般会有很多个 partitions。
为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,
而每个 Consumer 又会启动一个或多个streams去分别消费 Topic 里面的数据。
而同一个Consumer Group内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
当然,每个 partition 只能由同一个消费组内的一个consumer来消费。
那么问题来了,同一个 Consumer Group 里面的 Consumer 是如何知道该消费哪些partition里面的数据呢?"
>>> 答: 这里由两种策略:
// 策略一:Range strategy
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。
如果除不尽,那么前面几个消费者线程将会多消费一个分区。
// 策略二: RoundRobin strategy
将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,
最后按照round-robin风格将分区分别分配给不同的消费者线程。
使用RoundRobin策略有两个前提条件必须满足:
>> 同一个Consumer Group里面的所有消费者的num.streams必须相等;
>> 每个消费者订阅的主题必须相同。
-------------------------------------------
通过partition.assignment.strategy参数选择 range 或 roundrobin。
partition.assignment.strategy参数默认的值是range。
#spring-boot中.yml配置
需考虑
-------------------------------------------
#Consumer均衡算法
当一个group中,有consumer加入或者离开时,会触发partitions均衡.
均衡的最终目的,是提升topic的并发消费能力.
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3
4) 根据consumer.id排序: C0,C1
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
#其他
consumer端向broker发送"fetch"请求,并告知其获取消息的offset;
此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.
在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.
不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;
consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);
并可以良好的控制消息消费的数量,batch fetch.
其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.
这就要求JMS broker需要太多额外的工作.
在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,
也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.
当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,
并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.
#3.broker间的消息复制(replication机制)
kafka将每个partition数据复制到多个server上,任何都有一个partition有一个leader和任意个follower,
备份(Replication=leader+follower)的个数可以通过broker配置文件来设定.
leader处理所有的read-write请求,follower需要和leader保持同步.
follower和consumer一样,消费消息并保存在本地日志中;
leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.
即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.
当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,
因此需要选择一个"up-to-date"的follower.
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,
如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.
在选举新leader,需要考虑到"负载均衡".
#summary
对于Kafka而言,定义一个Broker是否“活着”包含两个条件:
一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。
二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
基于ISR的数据复制方案
Kafka的数据复制是以Partition为单位的。而多个备份间的数据复制,通过Follower向Leader拉取数据完成。
从一这点来讲,Kafka的数据复制方案接近于mysql的Master-Slave方案。
不同的是,Kafka既不是完全的同步复制,也不是完全的异步复制,而是基于ISR的动态复制方案。
#ISR,也即In-sync Replica。
每个Partition的Leader都会维护这样一个列表,该列表中,包含了所有与之同步的Replica(包含Leader自己)。
每次数据写入时,只有ISR中的所有Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。
这种方案,与同步复制非常接近。
但不同的是,这个ISR是由Leader动态维护的。
如果Follower不能紧“跟上”Leader,它将被Leader从ISR中移除,
待它又重新“跟上”Leader后,会被Leader再次加加ISR中。
每次改变ISR后,Leader都会将最新的ISR持久化到Zookeeper中。
至于如何判断某个Follower是否“跟上”Leader,不同版本的Kafka的策略稍微有些区别。
1) 对于0.8.*版本
如果Follower在`replica.lag.time.max.ms`时间内未向Leader发送Fetch请求(也即数据复制请求), 则Leader会将其从ISR中移除。
如果某Follower持续向Leader发送Fetch请求,但是它与Leader的数据差距在`replica.lag.max.messages`以上,也会被Leader从ISR中移除。
2) 从0.9.0.0版本
`replica.lag.max.messages`被移除,故Leader不再考虑Follower落后的消息条数。
另外,Leader不仅会判断Follower是否在`replica.lag.time.max.ms`时间内向其发送Fetch请求,
同时还会考虑Follower是否在该时间内与之保持同步。
3) 0.10.* 版本的策略与0.9.*版一致
对于0.8.*版本的`replica.lag.max.messages`参数,既然只有ISR中的所有Replica复制完后的消息才被认为Commit,
那为何会出现Follower与Leader差距过大的情况。
原因在于,Leader并不需要等到前一条消息被Commit才接收后一条消息。
事实上,Leader可以按顺序接收大量消息,最新的一条消息的Offset被记为High Wartermark。
而只有被ISR中所有Follower都复制过去的消息才会被Commit,Consumer只能消费被Commit的消息。
由于Follower的复制是严格按顺序的,所以被Commit的消息之前的消息肯定也已经被Commit过。
换句话说,High Watermark标记的是Leader所保存的最新消息的offset,
而Commit Offset标记的是最新的可被消费的(已同步到ISR中的Follower)消息。
而Leader对数据的接收与Follower对数据的复制是异步进行的,因此会出现Commit Offset与High Watermark存在一定差距的情况。
0.8.*版本中`replica.lag.max.messages`限定了Leader允许的该差距的最大值。
#Kafka基于ISR的数据复制方案原理如下图所示。
[图片上传失败...(image-9c96fb-1570760404167)]
// 第一步
Leader A总共收到3条消息,故其high watermark为3,
但由于ISR中的Follower只同步了第1条消息(m1),故只有m1被Commit,也即只有m1可被Consumer消费。
此时Follower B与Leader A的差距是1,而Follower C与Leader A的差距是2,
均未超过默认的`replica.lag.max.messages`,故得以保留在ISR中。
// 第二步
由于旧的Leader A宕机,新的Leader B在`replica.lag.time.max.ms`时间内未收到来自A的Fetch请求,
故将A从ISR中移除,此时ISR={B,C}。
同时,由于此时新的Leader B中只有2条消息,并未包含m3(m3从未被任何Leader所Commit),所以m3无法被Consumer消费。
// 第四步
Follower A恢复正常,它先将宕机前未Commit的所有消息全部删除,
然后从最后Commit过的消息的下一条消息开始追赶新的Leader B,
直到它“赶上”新的Leader,才被重新加入新的ISR中。
# 使用ISR方案的原因
1) 由于Leader可移除不能及时与之同步的Follower,
故与同步复制相比可避免最慢的Follower拖慢整体速度,也即ISR提高了系统可用性。
2) ISR中的所有Follower都包含了所有Commit过的消息,而只有Commit过的消息才会被Consumer消费,
故从Consumer的角度而言,ISR中的所有Replica都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。
3) ISR可动态调整,极限情况下,可以只包含Leader,极大提高了可容忍的宕机的Follower的数量。
与`Majority Quorum`方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。
# ISR相关配置说明---->important!!!!!
1) Broker的`min.insync.replicas`参数指定了Broker所要求的ISR最小长度,默认值为1。
也即极限情况下ISR可以只包含Leader。
但此时如果Leader宕机,则该Partition不可用,可用性得不到保证。
2) 只有被ISR中所有Replica同步的消息才被Commit,但Producer发布数据时,
Leader并不需要ISR中的所有Replica同步该数据才确认收到数据。Producer可以通过`acks`参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。
`acks`的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,
此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。
而如果将该值设置为0,则Producer发送完数据后,立即认为该数据发送成功,
不作任何等待,而实际上该数据可能发送失败,并且Producer的Retry机制将不生效。
更推荐的做法是,将`acks`设置为`all`或者`-1`,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),
Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。
ISR具体实现参 zero-copy 相关文章
partition与replication分配
#假定集群
broker: s1 s2 s3
topic: videoTopic
partition: 3
replication: 2
#partition 分区
就是数据的水平切分,比如上面的配置中把一个topic的数据分成3分进行存储,
而且不同分区一般都是在不同的broker中。这个就是kafka的高扩展性。
比如上面s1、s2、s3各有一个分区。
#replication 副本
replication的概念就是kafka的高可用性,比如s1节点宕机了,那么s1节点的分区将变得不可用,
那么数据就会不完整,为了防止这个问题,引入了replication概念,
比如replication配置成2,这就意味着一个partition有2个replication。
#说明
此处partition是3,假如partition索引分别是 0 1 2
此处replication是2,那么就意味着partition 0、1、2 都存在2个的意思。
也就是有2*3=6个分区。(0 0 1 1 2 2)
体现在kafka的存储结构就是
>> s1中有文件夹 videoTopic-0 videoTopic-1 :0和1 号partition 存在s1中
>> s2中有文件夹 videoTopic-0 videoTopic-2 :0和2号partition 在s2中
>> s3中有文件夹 videoTopic-1 videoTopic-2 :1和2号partition 在s3中
这里也存在均衡分配,比如0号replication有2个,会选出一个leader。
1.4.6 高性能
>> 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
>> zero-copy:减少IO操作步骤。--> linux的 zero
>> 支持数据批量发送和拉取。
>> 支持数据压缩。
>> Topic划分为多个partition,提高并行处理能力。
>> producer端: 将消息buffer起来, 批量发送.
>> consumer端: 批量fetch消息.
>> 即所有的producer、broker和consumer都会有多个,均为分布式的。
Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。
所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。
如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
利用Partition实现并行处理
# Partition提供并行处理的能力
Kafka是一个Pub-Sub的消息系统,无论是发布还是订阅,都须指定Topic。
Topic只是一个逻辑的概念。每个Topic都包含一个或多个Partition,不同Partition可位于不同节点。
同时Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,
每个Segment包含一个数据文件和一个与之对应的索引文件。
在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。
一方面,由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。
另一方面,由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,
也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将`log.dirs`设置为多目录(用逗号分隔)。
Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。
注:虽然物理上最小单位是Segment,但Kafka并不提供同一Partition内不同Segment间的并行处理。
因为对于写而言,每次只会写Partition内的一个Segment,而对于读而言,也只会顺序读取同一Partition内的不同Segment。
# Partition是最小并发粒度
多Consumer消费同一个Topic时,同一条消息只会被同一Consumer Group内的一个Consumer所消费。
而数据并非按消息为单位分配,而是以Partition为单位分配,
也即同一个Partition的数据只会被一个Consumer所消费(在不考虑Rebalance的前提下)。
如果Consumer的个数多于Partition的个数,那么会有部分Consumer无法消费该Topic的任何数据,
也即当Consumer个数超过Partition后,增加Consumer并不能增加并行度。
简而言之,Partition个数决定了可能的最大并行度。
以Spark消费Kafka数据为例,如果所消费的Topic的Partition数为N,则有效的Spark最大并行度也为N。
即使将Spark的Executor数设置为N+M,最多也只有N个Executor可同时处理该Topic的数据。
1.4.7 kafka生成的日志文件(适配: kafka_2.12-2.3.0)
LogSegment概述
类别 | 作用 |
---|---|
.index | 偏移量索引文件 |
.timestamp | 时间戳索引文件 |
.log | 日志文件 |
.snaphot | 快照文件 |
.deleted | - |
.cleaned | 日志清理时临时文件 |
.swap | Log Compaction 之后的临时文件 |
Leader-epoch-checkpoint | - |
// 需要说明的是:
是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件。
// LogSement
在分区日志文件中,你会发现很多类型的文件,比如:
.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。
分区日志文件中包含很多的 LogSegment ,Kafka 日志追加是顺序写入的,
LogSegment 可以减小日志文件的大小,进行日志删除的时候和数据查找的时候可以快速定位。
同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。
每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则
// 如果想要查看相应文件内容可以通过 kafka-run-class.sh 脚本查看 .log
#windows版如下
soft_for_dev\kafka_2.12-2.3.0\bin\windows\kafka-run-class.bat kafka.tools.DumpLogSegments --files tmp\kafka-logs\__consumer_offsets-2\00000000000000000000.log
"或"
soft_for_dev\kafka_2.12-2.3.0\bin\windows\kafka-dump-log.bat --files tmp\kafka-logs\__consumer_offsets-49\00000000000000000000.index
日志与索引文件
偏移量索引文件(.log)用于记录消息偏移量与物理地址之间的映射关系。
时间戳索引文件(.timeindex)则根据时间戳查找对应的偏移量。
Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每一个消息在索引文件中都有对应的索引项。
每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,
通过修改 log.index.interval.bytes 的值,改变索引项的密度。
配置项 | 默认值 | 说明 |
---|---|---|
log.index.interval.bytes | 4096 (4K) | 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率 |
log.segment.bytes | 1073741824 (1G) | 日志文件最大值 |
log.roll.ms | - | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度 |
log.roll.hours | 168 (7天) | 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度 |
log.index.size.max.bytes | 10485760 (10MB) | 触发偏移量索引文件或时间戳索引文件分段字节限额 |
切分文件
// 日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?
当满足如下几个条件中的其中之一,就会触发文件的切分:
1) 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。
log.segment.bytes 参数的默认值为 1073741824,即 1GB。
2) 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。
如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。
默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
3) 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。
log.index.size.max.bytes 的默认值为 10485760,即 10MB。
4) 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,
即要追加的消息的偏移量不能转变为相对偏移量。
// 为什么是 Integer.MAX_VALUE ?
在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。
>> 相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
>> 物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE, 如果大于 Integer.MAX_VALUE, 则不能用 4 个字节进行表示了。
// 索引文件切分过程
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,
即文件创建的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。
日志清理
日志清理,不是日志删除,这还是有所区别的。
Kafka 提供两种日志清理策略:
日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。
Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还可以选择 compact。
是否支持针对具体的 Topic 进行配置?
答案是肯定的,主题级别的配置项是 cleanup.policy 。
日志删除
配置项 | 默认值 | 说明 |
---|---|---|
log.retention.check.interval.ms | 300000 (5分钟) | 检测频率 |
log.retention.hours | 168 (7天) | 日志保留时间小时 |
log.retention.minutes | - | 日志保留时间分钟 |
log.retention.ms | - | 日志保留时间毫秒 |
file.delete.delay.ms | 60000 (1分钟) | 延迟执行删除时间 |
log.retention.bytes | -1 无穷大 | 运行保留日志文件最大值 |
log.retention.bytes | 1073741824 (1G) | 日志文件最大值 |
Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:
>> 基于时间的保留策略
>> 基于日志大小的保留策略
>> 基于日志起始偏移量的保留策略
1)基于时间
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。
如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。
// 如何查找日志分段文件中已经过去的数据呢?
Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,
查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。
// 为什么不直接选最近修改时间呢?
因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。
// 删除过程
>> 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
>> 这些日志分段所有文件添加 上 .delete 后缀。
>> 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。
>> 延迟执行时间可以通过 file.delete.delay.ms 进行设置
// 如果活跃的日志分段中也存在需要删除的数据时?
Kafka 会先切分出一个新的日志分段作为活跃日志分段,然后执行删除操作。
2) 基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。
设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。
// 删除过程
>> 计算需要被删除的日志总大小 (当前日志文件大小-retention值)。
>> 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
>> 执行删除。
3) 基于日志起始偏移量
判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,
若是,则可以删除此日志分段。
// 注意:
日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,
存在数据删除,可能与之相等的那条数据已经被删除了。
// 删除过程
>> 从头开始变了每一个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 11,小于 logStartOffset,将 日志分段 1 加入到删除队列中
>> 日志分段 2 的下一个日志分段的起始偏移量为 23,小于 logStartOffset,将 日志分段 2 加入到删除队列中
>> 日志分段 3 的下一个日志分段的起始偏移量为 30,大于 logStartOffset,则不进行删除。
1.4.8 Kafka 为什么这么快
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,
因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,
通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;
读取数据的时候配合sendfile直接暴力输出。
1.数据写入
Kafka会把收到的消息都写入到硬盘中,为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。
#1.顺序写入
磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。
在顺序读写的情况下,某些优化场景磁盘的读写速度可以和内存持平。
因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。
所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。
如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,
另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:
>> 磁盘顺序读写速度超过内存随机读写。
>> JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题。
>> 系统冷启动后,磁盘缓存依然可用。
下图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,
收到消息后Kafka会把数据插入到文件末尾(虚框部分)。
这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,
它会把所有的数据都保留下来,每个Consumer对每个Topic都有一个offset用来表示读取到了第几条数据 。
但是kafka提供了删除数据的策略: 基于时间和基于数据大小的策略.
#2.Memory Mapped Files
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。
所以Kafka的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统"分页存储"来利用内存提高I/O效率。
Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,
在64位操作系统中一般可以表示20G的数据文件,
它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。
完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
使用这种方式可以获取很大的I/O提升, 省去了用户空间到内核空间 复制的开销,
调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。
也有一个很明显的缺陷——不可靠, 写到mmap中的数据并没有被真正的写到硬盘,
操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
Kafka提供了一个参数——producer.type来控制是不是主动flush,
如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);
写入mmap之后立即返回Producer不调用flush叫 异步 (async)。
2.数据读取
Kafka在读取磁盘时做了哪些优化?
#基于sendfile实现Zero Copy
传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:
>> 调用read函数,文件数据被copy到内核缓冲区
>> read函数返回,文件数据从内核缓冲区copy到用户缓冲区
>> write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区。
>> 数据从socket缓冲区copy到相关协议引擎。
以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,
在这个过程当中,文件数据实际上是经过了四次copy操作:
>> 硬盘—>内核buf—>用户buf—>socket相关缓冲区—>协议引擎
而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。
在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。
sendfile的引入不仅减少了数据复制,还减少了上下文切换。
sendfile(socket, file, len);
运行流程如下:
>> sendfile系统调用,文件数据被copy至内核缓冲区
>> 再从内核缓冲区copy至内核中socket相关的缓冲区
>> 最后再socket相关的缓冲区copy到协议引擎
相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,
再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,
文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。
在apache,nginx,lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。
Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,
配合mmap作为文件读写方式,直接把它传给sendfile。
#批量压缩
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,
对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。
进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
>> 如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
>> Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
>> Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议
2.kafka安装
2.1 kafka集群版
2.1.1 安装zookeeper集群
切换目录, 下载zookeeper
cd /home
mkdir zookeeper
cd zookeeper
mkdir zkdata zkdatalog
wget http://www.apache.org/dist/zookeeper/stable/zookeeper-3.4.13.tar.gz
tar -zxf zookeeper-3.4.13.tar.gz
修改配置文件
cd /home/zookeeper/zookeeper-3.4.13/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/zookeeper/zkdata/
dataLogDir=/home/zookeeper/zkdatalog/
clientPort=12181
server.1=192.168.0.199:12888:13888
server.2=192.168.0.198:12888:13888
server.3=192.168.0.197:12888:13888
说明:
#server.1
这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.0.199为集群里的IP地址,
第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
创建myid文件
#server1
echo "1" > /home/zookeeper/zkdata/myid
#server2
echo "2" > /home/zookeeper/zkdata/myid
#server3
echo "3" > /home/zookeeper/zkdata/myid
重要配置说明:
1、myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。
3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。
4、zkEnv.sh和zkServer.sh文件
zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件
5、还有一个需要注意
zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。
但是可以通过命令去定期的清理。脚本如下:
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
其他清理zk日志的方法:
其他方法:
第二种:使用ZK的工具类PurgeTxnLog,它的实现了一种简单的历史文件清理策略,可以在这里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html
第三种:对于上面这个执行,ZK自己已经写好了脚本,在bin/zkCleanup.sh中,所以直接使用这个脚本也是可以执行清理工作的。
第四种:从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:
autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。
autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
启动服务
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh start
检查服务状态
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh status
zk集群一般只有一个leader,多个follower,
主一般是相应客户端的读写请求,而从主同步数据,
当主挂掉之后就会从follower里投票选举一个leader出来。
#server1:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
#server2:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
#server3:
[root@localhost zookeeper]# zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader
若是启动时报如下错误
Cannot open channel to 3 at election address /192.168.0.197:13888
请把三台都启动,
若三台都启动后, 仍报错:
1)关闭防火墙
2)仔细检查各配置文件, 不要有中文等错误字符, 注意路径
3)删除曾经启动时, zkdata目录下的文件
"
drwxr-xr-x. 2 root root 47 Mar 11 21:27 version-2
-rw-r--r--. 1 root root 5 Mar 11 21:26 zookeeper_server.pid
"
4)再次重启
用jps命令查看zk进程
[root@localhost zookeeper]# jps
36741 QuorumPeerMain
37257 Jps
关闭zookeeper命令
/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh stop
2.1.2 在zookeeper集群基础上搭建kafka集群
切换目录, 下载kafka
cd /home/
mkdir kafka
cd kafka
mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息
wget https://www-us.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
#或者镜像下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
tar -zxf kafka_2.11-2.1.1.tgz
环境变量配置
#vi source /etc/profile
export KAFKA_HOME=/home/kafka/kafka_2.11-2.1.1
export PATH=$PATH:$KAFKA_HOME/bin
#保存使其立即生效
source /etc/profile
vi /home/kafka/kafka_2.11-2.1.1/config/server.properties
#每台服务器的broker.id都不能相同
broker.id=199或198或197
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.0.199:12181,192.168.0.198:12181,192.168.0.197:12181
分别启动三台机器上的kafka集群
/home/kafka/kafka_2.11-2.1.1/bin/kafka-server-start.sh -daemon /home/kafka/kafka_2.11-2.1.1/config/server.properties
检查服务是否启动
[root@localhost config]# jps
123988 Jps
121142 QuorumPeerMain
123965 Kafka
关闭kafka命令
/home/kafka/kafka_2.11-2.1.1/bin/kafka-server-stop.sh
创建一个topic
/home/kafka/kafka_2.11-2.1.1/bin/kafka-topics.sh --zookeeper 192.168.0.199:12181 --replication-factor 1 --partitions 1 --topic test_kafka
启动生产者
/home/kafka/kafka_2.11-2.1.1/bin/kafka-console-producer.sh --broker-list 192.168.0.199:9092 --topic test_kafka
启动消费者
/home/kafka/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.199:9092 --topic test_kafka --from-beginning
在生产者所在命令行发送消息, 在消费者所在shell窗口便可以接收到消息
2.2 windows上安装kafka单机
以kafka_2.12-2.3.0为例
#step1.下载 kafka, 该版本不必下载 zookeeper
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#step2.切换到kafka层的目录
cd D:\soft_for_dev\kafka_2.12-2.3.0
#step3.启动zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
#step4.启动kafka
bin\windows\kafka-server-start.bat config\server.properties
#step5.创建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-log
#step6.创建生产者产生消息,不关闭页面
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic user-log
#step7.创建消费者接收消息,不关闭页面
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic user-log --from-beginning
布控kafka-manager
#1.概述
为了简化开发者和服务工程师维护Kafka集群的工作,
yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。
可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。
它支持管理多个集群、选择副本、副本重新分配以及创建Topic。
此外还有如下功能:
>> 管理多个kafka集群
>> 便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
>> 选择你要运行的副本
>> 基于当前分区状况进行
>> 可以选择topic配置并创建topic
>> 删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
>> Topic list会指明哪些topic被删除
>> 为已存在的topic增加分区
>> 为已存在的topic更新配置
>> 在多个topic上批量重分区
>> 在多个topic上批量重分区(可选partition broker位置)
#2.安装环境要求
>> java
>> scala
>> sbt
#3.## 安装步骤 (windows版)
1.下载最新的kafka-manager的releases中zip版本https://github.com/yahoo/kafka-manager/releases, 解压
2.下载安装kafka-manager要求的java版本,检查系统环境path路径是否添加
3.下载安装kafka-manager要求scala的window版本对应的msi文件 https://www.scala-lang.org/download/,双击安装,检查系统环境path路径是否添加(任意目录进入cmd, 输入scala 回车)
4.下载安装kafka-manager要求sbt的window版本对应的msi文件https://www.scala-sbt.org/download.html, 双击安装,检查系统环境path路径是否添加
(sbt的配置参考: https://blog.csdn.net/u014532217/article/details/78966807)
#4.编译解压运行
>> 进入kafka-manager目录, 执行 解压命令: sbt clean dist
>> 提取zip包: D:\soft_for_dev\kafka-manager-master\target\universal\kafka-manager-2.0.0.2.zip
>> 修改application.conf配置文件
kafka-manager.zkhosts="localhost:2181" // 这里修改下
#kafka-manager.zkhosts=${?ZK_HOSTS} // 该行要注释掉
(另外需修改kafka下的配置文件zookeeper.properties
maxClientCnxns=100)
>> 可用git客户端打开shell窗口, 执行命令: ./bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=9999
>> 浏览器中输入 http://localhost:9999即可看到 kafka-manager界面
https://blog.csdn.net/weixin_41846320/article/details/84782871 (kafka的windows版安装)
2.3 docker-compose安装
2.3.1 安装zookeeper
#准备docker-compose.yml
#在docker-compose.yml文件同一目录下, 执行'docker-compose up -d'命令
#执行'docker ps'命令, 查看进程号
#执行'docker exec -it 进程号 bash'进入某一zk下
#进入后, 执行'zkServer.sh status'命令, 查看节点状态
docker-compose.yml
version: '3.4'
services:
zoo1:
image: zookeeper:3.4
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper:3.4
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper:3.4
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
# notice: execute command 'docker-compose up -d' to run zookeeper clusters
https://blog.csdn.net/mmmaaaggg/article/details/85000604 (docker安装kafka集群)
https://testerhome.com/topics/16126 (docker安装kafka集群)
3.kafka配置文件详解(适配: kafka_2.12-2.3.0)
3.1 bin目录下脚本文件
kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,
能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。
但一般仅限于测试环境使用。
脚本 | 功能 |
---|---|
kafka-server-start.sh | 启动kafka服务器; |
kafka-server-stop.sh | 停止kafka服务器; |
kafka-topics.sh | topic管理; |
kafka-console-producer.sh | 基于命令行的生产者; |
kafka-console-consumer.sh | 基于命令行的消费者; |
kafka-run-class.sh | 运行java类的脚本,由kafka-server-start.sh和kafka-server-stop.sh、kafka-topics.sh等脚本调用; |
zookeeper-server-start.sh | 启动kafka自带的zookeeper服务器; |
zookeeper-server-stop.sh | 停止kafka自带的zookeeper服务器; |
zookeeper-shell.sh | 在命令行连接zookeeper的客户端工具; |
connect-standalone.sh | 在命令行启动单点的connector; |
connect-distributed.sh | 在命令行启动基于集群connector; |
3.2 config目录下配置文件
脚本 | 功能 |
---|---|
server.properties | kafka实例的配置文件,配置kafka最重要的配置文件; |
log4j.properties | kafka日志配置; |
zookeeper.properties | 自带zk的配置文件; |
producer.properties | 基于命令行的生产者工具配置文件;(测试用) |
consumer.properties | 基于命令行的消费者工具配置文件;(测试用) |
connect-standalone.properties | 自带单点connector的配置文件,存放connector的序列化方式、监听broker的地址端口等通用配置;(测试用) |
connect-file-source.properties | 配置文件读取connector,用于逐行读取文件,导入入topic;(测试用) |
connect-file-sink.properties | 配置文件写入connector,用于将topic中的数据导出到topic中;(测试用) |
3.2.1 server.properties (broker配置文件)
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics (基本配置) #############################
# broker id, id必须是唯一的整数
broker.id=0
# 是否可以删除topic,如果为true,我们可以在命令行删除topic,否则,不能。
#delete.topic.enable=true
############################# Socket Server Settings (socket配置) #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# broker监听地址。如果没有配置,默认为java.net.InetAddress.getCanonicalHostName()方法返回的地址
#申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,
#默认会使用localhost的地址,如果是在远程服务器上运行则必须配置
#listeners=PLAINTEXT://:9092
# broker的主机名和端口号将会广播给消费者与生产者。如果没有设置,默认为监听配置,否则使用java.net.InetAddress.getCanonicalHostName()方法返回的地址
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 监听协议,默认为PLAINTEXT
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 服务器接受请求和响应请求的线程数
num.network.threads=3
# 处理请求的线程数,包括磁盘的IO操作
num.io.threads=8
# 服务器socket发送缓存
socket.send.buffer.bytes=102400
# 服务器socket接收缓存
socket.receive.buffer.bytes=102400
# 服务器接收请求的最大值
socket.request.max.bytes=104857600
############################# Log Basics (log基本配置) #############################
# log日志文件夹
log.dirs=/tmp/kafka-logs
# 每个topic的默认日志分区数。允许分区数大于并行消费数,这样可能导致,更多的文件将会跨broker
num.partitions=1
# 在启动和关闭刷新时,没有数据目录用于日志恢复的线程数。
# 这个值,强烈建议在随着在RAID阵列中的安装数据目录的增长而增长。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings (内部topic配置) #############################
# 内部__consumer_offsets和__transaction_state两个topic,分组元数据的复制因子。
# 除开发测试外的使用,强烈建议值大于1,以保证可用性,比如3。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy (日志刷新策略) #############################
# 消息立刻被写到文件系统,默认调用fsync方法,懒同步操作系统缓存。下面的配置用于控制刷新数据到磁盘。
# 这里是一些折中方案:
# 1. 持久性:如果没有使用replication,没刷新的数据可能丢失。
# 2. 延迟性:当有大量的数据需要刷新,刷新操作发生时,比较大的刷新间隔可能会导致延时。
# 3. 吞吐量:刷新操作代价比较高,较小的刷新间隔,将会引起过渡的seek文件操作。
# 下面的配置刷新策略,允许在一个的刷新间隔或消息数量下,刷新数据,这个配置是全局的,可以在每个topic下重写。
# 在强制刷新数据到磁盘前,允许接受消息数量
#log.flush.interval.messages=10000
# 在强制刷新前,一个消息可以日志中停留在最大时间
#log.flush.interval.ms=1000
############################# Log Retention Policy (日志保留策略) #############################
# 下面的配置用于控制日志segments的处理。这些策略可以在一定的时间间隔和数据累积到一定的size,可以删除segments。
# 两种策略只要有一种触发,segments将会被删除。删除总是从log的末端。
# log文件的保留的时间
log.retention.hours=168
# log文件保留的size
#log.retention.bytes=1073741824
# 日志segments文件最大size,当日志文件的大于最大值,则创建一个新的log segment
log.segment.bytes=1073741824
# 日志保留检查间隔
log.retention.check.interval.ms=300000
############################# Zookeeper (Zookeeper配置) #############################
# zookeeper地址,多个以逗号隔开比如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=localhost:2181
# 连接zookeeper超时时间
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings (分组协调配置) #############################
# 下面的配置为毫秒时间,用于延时消费者重平衡的时间。重平衡将会进一步在新成员添加分组是,
# 延时group.initial.rebalance.delay.ms时间,直到到达maximum of max.poll.interval.ms时间。
# 默认值为3秒,我们重写0,主要是用户开发测试体验。在生产环境下,默认值3s,在应用启动期间,
# 帮助避免不必要及潜在的代价高的rebalances,是比较合适的。
group.initial.rebalance.delay.ms=0
3.2.2 zookeeper.properties (内置zk的配置文件, 生产可不用)
# 数据目录
dataDir=/tmp/zookeeper
# 监听端口
clientPort=2181
# 最大连接数,非生产环境配置
maxClientCnxns=0
3.2.3 producer.properties(测试用, 生产在项目配置文件中配置)
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
生产者基本配置
# broker地址配置,集群则格式为 host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 是否压缩数据,有none, gzip, snappy, lz4,默认为压缩
compression.type=none
# 分区事件的类名,默认随机
#partitioner.class=
# 请求超时时间
#request.timeout.ms=
# `KafkaProducer.send` and `KafkaProducer.partitionsFor`最长阻塞时间
#max.block.ms=
# 生产者延时发送消息的时间,以便可以批量发送消息
#linger.ms=
# 最大请求size
#max.request.size=
# 每次可以批量发送到一个分区的消息记录数
#batch.size=
# 在消息发送至server前,生产者可以缓存的消息大小
#buffer.memory=
3.2.4 consumer.properties(测试用, 生产在项目配置文件中配置)
# see kafka.consumer.ConsumerConfig for more details
# zookeeper连接地址,集群则个时如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
zookeeper.connect=127.0.0.1:2181
# zookeeper 连接超时时间
zookeeper.connection.timeout.ms=6000
# 消费者分组id
group.id=test-consumer-group
# 消费超时时间
#consumer.timeout.ms=5000
# 一次最大消费的消息字节数, 若实际消息大于该值, 消费端就无法消费,
# 导致一直卡在这一条消息,现象就是消费停止。
#kafka0.8版本为fetch.message.max.bytes,默认是1M
#fetch.max.bytes=50 * 1024 * 1024; (本版本默认 50M)
https://yq.aliyun.com/articles/370504 (kafka_2.11-1.0.0.tgz)
https://www.iteye.com/blog/donald-draper-2397000 (kafka_2.11-1.0.0)
https://www.cnblogs.com/jun1019/p/6256371.html
https://www.cnblogs.com/alan319/p/8651434.html (Kafka0.8)
4.spring中使用kafaka (适配: kafka_2.12-2.3.0)
4.1 消息提供方
4.2 消费端
https://www.cnblogs.com/lshan/p/11544111.html (手动创建topic)
https://blog.csdn.net/tmeng521/article/details/90902236 (consumer篇)
5.常见问题
5.1消息积压在消息队列里的问题
5.1.1消费端不消费
原因1: 实际发送消息量 > 配置量
# 一次最大消费的消息字节数, 若实际消息大于该值, 消费端就无法消费,
# 导致一直卡在这一条消息,现象就是消费停止。
#kafka0.8版本为fetch.message.max.bytes,默认是1M
#fetch.max.bytes=50 * 1024 * 1024; (本版本kafka2.12默认 50M)
解决方案:
在consumer层的代码中配置 fetch.max.bytes 属性大些即可
// 原因2: 查看日志是不是包含ConsumerRebalanceFailedException
如果包含,说明是rebalance失败,更改配置,
确保 rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
(默认符合,如果机器较多,或者消费的Topic较多,建议rebalance.max.retries设置大一点),并且滚动重启消费!
如果滚动重启重试多次都是失败,建议将所有消费端都停止一段时间后(目的是为了保证监控到消费端全部下线),
滚动重启(并且消费服务启动间隔时间变长)
#其他情形:
排查消费端线程是否有数据库阻塞, FullGC, OOM等导致消费线程阻塞的,
此外消费端代码需要进行try...catch...包裹
结论: 总体解决方案
临时扩容,以更快的速度去消费数据。具体操作步骤和思路如下:
①先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
②临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。
③然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,
消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
④紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。
⑤这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。
⑥等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。
5.2 消息重复消费的问题 (幂等性)
kafka有一个叫做offset的概念,就是每个消息写进去,都有一个offset代表他的序号,
然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,
代表我已经消费过了,下次就算重启,kafka就会让消费者从上次消费到的offset来继续消费。
如果consumer消费了数据,还没来得及发送自己已经消费的消息的offset就挂了,
那么重启之后就会收到重复的数据(可以打断点来模拟实现)。
解决思路
要保证消息的幂等性,这个要结合业务的类型来进行处理。下面提供几个思路供参考:
(1)可在内存中维护一个set,只要从消息队列里面获取到一个消息,
先查询这个消息在不在set里面,如果在表示已消费过,直接丢弃;
如果不在,则在消费后将其加入set当中。
(2)如何要写数据库,可以拿唯一键先去数据库查询一下,
如果不存在在写,如果存在直接更新或者丢弃消息。
(3)如果是写redis那没有问题,每次都是set,天然的幂等性。
(4)让生产者发送消息时,每条消息加一个全局的唯一id,然后消费时,将该id保存到redis里面。
消费时先去redis里面查一下有么有,没有再消费。
(5)数据库操作可以设置唯一键,防止重复数据的插入,这样插入只会报错而不会插入重复数据。
5.3 消息丢失的问题
5.3.1 producer 丢失了消息
生产者没有设置相应的策略,发送过程中丢失数据。
解决方案
#同时设置下述两个配置项
// 1.ack=all (或者acks=-1, 负1)
某个partition 的 leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。
// 2.retries=MAX(很大的一个值,表示无限重试)
如果没满足这个条件,生产者会自动不断的重试,重试无限次。
(个人觉得不妥, 重要消息, 应重试指定次数后, 告警即可.)
5.3.2 brokers 丢失了数据
比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。
如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,数据丢失。
解决方案
一般要求设置4个参数来保证消息不丢失:
①给topic设置 replication.factor参数:
这个值必须大于1,表示要求每个partition必须至少有2个副本。
②在kafka服务端设置min.isync.replicas参数:
这个值必须大于1,表示要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,
这样才能保证leader挂了之后还有一个follower。
③在生产者端设置acks=all:(或者acks=-1, 负1)
表示要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了
④在生产者端设置retries=MAX(很大的一个值,表示无限重试):
表示 这个是要求一旦写入失败,就一直重试
(个人觉得不妥, 重要消息, 应重试指定次数后, 告警即可.)
5.3.3 consumer 丢失了数据
消费者消费到了这个数据,然后消费之自动提交了offset,让kafka知道你已经消费了这个消息,
当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
解决方案
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
5.4 如何保证消息按顺序执行
5.4.1 为什么要保证顺序
消息队列中的若干消息如果是对同一个数据进行操作,
这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。
举例:
1) 通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,
如果操作顺序搞反,就会造成不可估量的错误。
2) 数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,
如果在同步过程中,消息的顺序变成了 删除->插入->更新,
那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
5.4.2 出现顺序错乱的场景及解决方案
5.4.2.1 consuemr单线程消费乱序
consuemr单线程消费乱序
具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,
但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,
这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
consuemr单线程消费解决方案
确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费。
5.4.2.2 consuemr多线程消费乱序
consuemr多线程消费乱序
kafka一个topic,一个partition,一个consumer,
但是consumer内部进行多线程消费,这样数据会出现顺序错乱问题。
consuemr多线程消费解决方案
写N个内存queue,然后N个线程分别消费一个内存queue即可
5.5 保证消息的可靠传输
5.5.1 brokers配置
broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
与其他配置参数一样,它们可以应用在 broker 级别,用于控制所有主题的行为,
也可以应用在主题级别,用于控制个别主题的行为。
///////// 1.复制系数
>> topic级别的配置参数是 replication.factor;
>> broker级别配置参数是 default.replication.factor 来配置自动创建的主题。
如果复制系数为 N,那么在 N-1 个 broker 失效的情况下,仍然能够从主题读取数据或向主
题写入数据。
所以,更高的复制系数会带来更高的可用性、可靠性和更少的故障。
另一方
面,复制系数 N需要至少 N 个 broker,而且会有 N 个数据副本,也就是说它们会占用 N
倍的磁盘空间。
我们一般会在可用性和存储硬件之间作出权衡。
///////// 2.不完全的首领选举
unclean.leader.election.enable 只能在 broker 级别(实际上是在集群范围内)进行配置, 它的默认值是 true 。
当分区首领不可用时, 一个同步副本会被选为新首领。
如果在选举过程中没有丢失数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是“完全”的。
但如果在首领不可用时其他副本都是不同步的,我们该怎么办呢?
>> 如果不同步的副本不能被提升为新首领,那么分区在旧首领(最后一个同步副本)恢复
之前是不可用的。
有时候这种状态会持续数小时(比如更换内存芯片)。
>> 如果不同步的副本可以被提升为新首领,那么在这个副本变为不同步之后写入旧首领的
消息、会全部丢失,导致数据不一致。
为什么会这样呢?假设在副本 0 和副本 l 不可用时,偏移量 100-200 的消息被写入副本 2 (首领)。
现在副本 2 变为不可用的,而副本 0 变为可用的。副本 0 只包含偏移量 O~ 100 的消息,不包含偏移量 100~200 的悄息。
如果我们允许副本 0 成为新首领,生产者就可以继续写人数据,悄费者可以继续读取数据。
于是 ,新首领就有了偏移量 100~200 的新梢息。
这样,部分消费者会读取到偏移量 100~200 的旧消息,部分消费者会读取到偏移量 100~200 的新消息,还有部分消费者读取的是二者的混合。
这样会导致非常不好的结果,比如生成不准确的报表。
另外, 副本 2 可能会重新变为可用,并成为新首领的跟随者。
这个时候,它会把比当前首领旧的消息全部删除,而这些消息对于所有消费者来说都是不可用的。
简而言之,
如果我们允许不同步的副本成为首领,那么就要承担丢失数据和出现数据不一致的风险。
如果不允许它们成为首领,那么就要接受较低的可用性,因为我们必须等待原先的首领恢复到可用状态。
如果把 unclean.leader.election.enable 设为 true ,就是允许不同步的副本成为首领, 也就是"不完全的选举",那么我们将面临丢失消息的风险。
如果把这个参数设为 false ,就要等待原先的首领重新上线,从而降低了可用性。
我们经常看到一些对数据质量和数据一致性要求较高的系统会禁用这种不完全的首领选举(把这个参数设为 false) 。
银行系统是这方面最好的例子,大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支付事务,也不会冒险处理错误的消息。
不过在对可用性要求较高的系统里,比如实时点击流分析系统, 一般会启用不完全的首领选举。
///////// 3.最少同步副本
在topic级别和 broker 级别上,这个参数都叫 min.insync.replicas 。
尽管为一个主题配置了 3 个副本,还是会出现只有一个同步副本的情况(另外2个挂了)。
如果这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择, 这是一个两难的选择。
根据 Kafka 对可靠性保证的定义,消息只有在被写入到所有同步副本之后才被认为是已提交的。
但如果这里的“所有副本”只包含一个同步副本,那么在这个副本变为不可用时 ,数据就会丢失。
如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点的值。
对于一个包含 3 个副本的主题,如果 min.insync.replicas 被设为 2,那么至少要存在两个同步副本才能向分区写入数据。
如果 3 个副本都是同步的,或者其中一个副本变为不可用,都不会有什么问题。
不过,如果有两个副本变为不可用,那么 broker 就会停止接受生产者的请求。
尝试发送数据的生产者会收到 NotEnoughReplicasException 异常。消费者仍然可以继续读取已有的数据。
实际上,如果使用这样的配置,那么当只剩下一个同步副本时,它就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。
为了从只读状态中恢复,必须让两个不可用分区中的一个重新变为可用的(比如重启 broker ),并等待它变为同步的 。
5.5.2 producer配置
//////// 1.发送确认
>> acks=0 意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。
>> acks=1 意味若首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。
>> acks=all 意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到悄息。
如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息 。
这是最保险的做也一一生产者会一直重试直到消息被成功提交。
//////// 2.配置生产者的重试参数
生产者需要处理的错误包括两部分 :
一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
LEADER_NOT_AVAILABLE等是可重试的;
序列化错误 &
5.5.3 consumer配置
///////// 1.group.ld
如果两个消费者具有相同的 group.ld,井且订阅了同 一个主题,那么每个消费者会分到主题分区的一个子集,
也就是说它们只能读到所有消息的一个子集(不过群组会读取主题所有的消息)。
如果你希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.ld。
///////// auto.offset.reset
这个参数指定了在没有偏移量可提交时(比如消费者第 l 次启动时)或者请求的偏移量在 broker 上不存在时,消费者会做些什么。
这个参数有两种配置 。
一种是 earliest ,如果选择了这种配置,消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。
一种是 latest ,如果选择了这种配置, f肖费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
///////// enable.auto.commit
这是一个非常重要的配置参数,你可以让悄费者基于任务调度自动提交偏移量 ,也可以在代码里手动提交偏移量。
自动提交的一个最大好处是,在实现消费者逻辑时可以少考虑一些问题。
如果你在消费者轮询操作里处理所有的数据,那么自动提交可以保证只提交已经处理过的偏移量。
自动提交的主要缺点是,无怯控制重复处理消息(比如消费者在自动提交偏移量之前停止处理悄息),
而且如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。
///////// auto.commit.interval.ms 与 enable.auto.commit 有直接的联系
如果选择了自动提交偏移量,可以通过该参数配置提交的频度, 默认值是每 5 秒钟提交一次。
一般来说,频繁提交会增加额外的开销,但也会降低重复处理消息的概率。
参考资源
https://www.jianshu.com/p/02fdcb9e8784 (保证消息按顺序执行--源自中华石杉)
https://www.jianshu.com/p/8ed16edc73e4 (保证数据不丢失--源自中华石杉)
https://www.jianshu.com/p/172295e2e978 (保证消息重复消费--源自中华石杉)
http://www.importnew.com/23199.html
https://blog.csdn.net/linke1183982890/article/details/83303003
https://blog.csdn.net/b6ecl1k7bs8o/article/details/80251930 (consumer消费策略)
https://www.cnblogs.com/wolf-bin/p/9085370.html (producer发送策略)
https://www.aboutyun.com//forum.php/?mod=viewthread&tid=9341&extra=page%3D1&page=1&(kafka详解)
https://www.cnblogs.com/gxc2015/p/9835837.html (kafka配置详解)
https://www.cnblogs.com/yinchengzhe/p/5111648.html(算法)
https://blog.csdn.net/qq_35457078/article/details/88838511 (consumer手动提交 offset)
https://www.jianshu.com/p/f62099d174d9 (性能优化-->0.8版本)
https://segmentfault.com/a/1190000019147699 (日志文件, 清理策略)
https://www.cnblogs.com/jasongj/p/6760039.html (replication 数据复制)
https://www.cnblogs.com/binyue/p/10308754.html (kafka为什么这么快)
《kafka权威指南》[美] Neha Narkhede Gwen Shapira Todd Palino 著, 薛命灯译