1介绍
Kafka是一个分布式的、可分区的、可复制的消息系统,提供了一个生产者、缓冲区、消费者的模型。
- Kafka作为一个集群,运行在一台或者多台服务器上.
- Kafka 通过 topic 对存储的流数据进行分类。
- 每条记录中包含一个key,一个value和一个timestamp(时间戳)。
Kafka有四个核心的API:
- The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
- The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
- The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
- The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
2关键词
Broker:
Kafka集群包含一个或多个服务器,这种服务器称为Broker。Topic:
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。Partitions:
对于每一个topic, Kafka集群都会维持一个分区日志,每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录,无论他们是否已被消费,并通过一个可配置的参数,保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题。
事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。
当数据大小超过了单台服务器的限制,允许数据进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个topic可能有多个分区,因此可以处理无限量的数据。(一个server有多个分区,一个分区只会存在于一个server上面。原文:每个分区只属于一个台服务器,所以如果有20个分区,那么全部数据(包含读写负载)将由不超过20个服务器(不包含副本)处理。)
生产者
生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些其他原则(例如:记录中的key)来完成。消费者
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例。一个消费者组的消费者只有一个会获取到消息。
传统的消息系统有两个模块: 队列 和 发布-订阅。在队列中,消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。
在kafka中,两个极端情况:
如果所有的消费者实例在同一消费组中,消息记录只会给到一个消费者实例. 类似队列模式。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.类似发布-订阅模式。
consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。
即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除。
3 High-level API,Low-level api
High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另外一个线程去每隔一段时间,offsite自动同步到zookeeper上。如果使用了High level api, 每个message只能被读一次,一旦读了这条message之后,无论我consumer的处理是否ok。High level api的另外一个线程会自动的把offiste+1同步到zookeeper上。如果consumer读取数据出了问题,offsite也会在zookeeper上同步。
Low level api是consumer读的partition的offsite在consumer自己的程序中维护。不会同步到zookeeper上。但是为了kafka manager能够方便的监控,一般也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite我们自己维护,我们不会+1。下次再启动的时候,还会从这个offsite开始读。
4容错
数据备份:
以partition为单位备份副本,副本数可设置。当副本数为N时,代表1个leader,N-1个followers,followers可以视为leader的consumer,拉取leader的消息,append到自己的系统中。
Leader选择:
ISR:某个分区内同步中的node组成一个集合,即该分区的ISR,ISR是followers的子集。
当leader处于非同步中时,系统从ISR中的followers中选举新leader。
当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入ISR。
当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入ISR。
kafka有个保障:当producer生产消息时,只有当消息被所有ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费
当有N个副本时,N个副本都在ISR中,N-1个副本都出现异常时,系统依然能提供服务
假设N副本全挂了,node恢复后会面临同步数据的过程,这期间ISR中没有node,会导致该分区服务不可用。kafka采用一种降级措施来处理:选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举。
由于leader是主要提供服务的,kafka broker将多个partition的leader均分在不同的server上以均摊风险。
5 Zookeeper
Zookeeper 协调控制:
- 管理broker与consumer的动态加入与离开。
- 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一 个consumer group内的多个consumer的消费负载平衡。
- 维护消费关系及每个partition的消费信息。
Zookeeper上的细节:
- 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
- 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
- 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。
同步复制&异步复制
同步复制:(默认)
- Producer联系zookeeper识别leader
- 向leader发送消息
- Leader收到消息写入本地log
- Follower从leader pull 消息
- Follower向本地写入log
- Follower向leader发送ack消息
- Leader收到所有的follower的ack消息
- Leader向producer发送ack
异步复制:
Leader在将消息写入本地log后,直接回传ack消息,不需要等待follower复制完成。
7好处
消费者可以根据需求,灵活指定offset消费
保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题
消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力
增加消息系统的可伸缩性。每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配
保证消息可靠性。消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失
灵活的持久化策略。可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间
8 Windows上安装运行kafaka
8.1 下载压缩包
http://kafka.apache.org/downloads
说明:这个安装包包含了zookeeper的jar包,不需要在本地安装zookeeper了。
8.2 配置zookeeper.properties并启动zookeeper
位置:安装目录/config
注意:这个目录的斜杠格式要按照Linux的来。
启动zookeeper: >bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动成功后会在相应的目录生成zookeeper_data文件夹
8.3 配置server.properties并启动kafka
位置:安装目录/config
启动kafka:>bin\windows\kafka-server-start.bat config\server.properties
启动成功后会在相应的目录生成kafka-logs文件夹
注意:如果以上两步骤报jdk方面的错,就修改bin\windows\kafka-run-class.bat文件,给179行%CLASSPATH%添加上双引号。
Set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
8.4 创建一个topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic t
est
8.5 查看topic列表
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
8.6 启动一个Producer
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
8.7 启动一个consumer
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
8.8 运行
最后在producer发送消息,就可以在consumer接收到了
9代码操作
9.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>com.company.test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
9.2 生产者 和 消费者
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class Test {
@org.junit.Test
public void testSend(){
Properties properties = new Properties();
properties.put("metadata.broker.list","localhost:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");//同步复制
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String, String> producer = new Producer<String, String>(producerConfig);
KeyedMessage<String, String> message = new KeyedMessage<String, String>("test","100","ggggggg");
producer.send(message);
System.out.println("send over");
}
@org.junit.Test
public void testConsumer(){
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("group.id","g1");
properties.put("zookeeper.session.timeout.ms","500");
properties.put("zookeeper.sync.time.ms","250");
properties.put("auto.commit.interval.ms","1000");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("test",new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> list = messageStreams.get("test");
for (KafkaStream<byte[], byte[]> stream : list) {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()){
byte[] message = iterator.next().message();
System.out.println(message);
}
}
}
}
9.3 代码操作更新版20190214
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
代码
package com.base.web.aisino.kafka;
import com.base.web.aisino.domain.RecordDetailDO;
import com.base.web.aisino.service.ReStoreDetailRecordService;
import com.base.web.aisino.service.impl.ReStoreDetailRecordServiceImpl;
import com.base.web.system.filter.ChannelHeaderFilter;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
/**
* @author ah.zhanglei3@aisino.com
* @ClassName
* @Description
* @Date 15:59 2019/2/13
*/
@Component
public class KafkaTool {
private final Logger logger = LoggerFactory.getLogger(KafkaTool.class);
@Autowired
private ReStoreDetailRecordService reStoreDetailRecordService;
/**
* 接收kafka
*/
public void received(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("jcpt"));
Thread receivedKafkaThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
ArrayList<RecordDetailDO> recordDetailDOS = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
logger.info("received kafka message : partition:"+record.partition()+",topic:"+record.topic()
+",offset:"+record.offset()+",value:"+record.value());
String detailRecordString = record.value();
Gson gson = new Gson();
RecordDetailDO recordDetailDO = gson.fromJson(detailRecordString, RecordDetailDO.class);
recordDetailDOS.add(recordDetailDO);
}
if (recordDetailDOS.size()>0){
reStoreDetailRecordService.batchSave(recordDetailDOS);
}
}
}
});
receivedKafkaThread.start();
}
/**
* 发送String
* @param msg
*/
public void send(String msg){
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks","1");//同步复制
KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("jcpt", msg);
kafkaProducer.send(record);
}
}
10kafka操作
添加和删除 topics:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
ps:20个分区,3个副本
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name添加分区(不支持减少分区):
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
--partitions 40增加一个配置项:
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --add-config x=y删除一个配置项:
bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --delete-config x
每当一个 borker 停止或崩溃时,该 borker 上的分区的leader 会转移到其他副本。这意味着,在 broker 重新启动时,默认情况下,它将只是所有分区的跟随者,这意味着它不会用于客户端的读取和写入。
- 让Kafka集群尝试恢复已恢复副本的领导地位:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
也可以通过以下配置来自动配置Kafka:
auto.leader.rebalance.enable=true
11问题
Kafka 对消费者分配分区规则
Kafka有两种分区分配策略: RoundRobinAssignor+RangeAssignor
RoundRobinAssignor:该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobinAssignor 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。
RangeAssignor:该策略会把主题的若干个连续的分区分配给消费者(kafka默认用该策略)。假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。
可以通过配置partition.assignment.strategy参数指定用哪个策略Offset往前读的问题
消费者可以任意指定offset的位置,这个位置可以是以前的位置或者是现在的位置,但是当offset指定以后,就只能往后读取。Follows和leader通信
high watermark (HW):表示Partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit位置,每个Broker缓存中维护此信息,并不断更新。
Kafka中每个Broker启动时都会创建一个副本管理服务(ReplicaManager),该服务负责维护ReplicaFetcherThread与其他Broker链路连接关系,该Broker中存在多少Follower的partitions对应leader partitions分布在不同的Broker上,有多少Broker就会创建相同数量的ReplicaFetcherThread线程同步对应partition数据,Kafka中partition间复制数据是由follower(扮演consumer角色)主动向leader获取消息,follower每次读取消息都会更新HW状态。每当Follower的partitions发生变更影响leader所在Broker变化时,ReplicaManager就会新建或销毁相应的ReplicaFetcherThread。Broker是不是一个进程
Kafka集群包含一个或多个服务器,这种服务器称为Broker,kafka启动以后,在服务器里面会有kafka进程。时间戳问题
每条记录数据包含一个key,一个value和一个时间戳,在生产者发送消息的时候不需要指定时间戳,在方法内部封装的有时间戳。
12 SpringCloudStream 操作kafka
SpringCloudStream可以操作很多的消息队列,包括kafka,可以直接使用SpringCloudStream API,避免了直接使用kafka的API
12.1 maven
//上文的kafka的包就不需要了
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
12.2 yml配置
# #spring cloud steam 相关配置
cloud:
stream:
bindings:
#配置自己定义的通道与哪个中间件交互
input: #Input和Output的值
destination: test #目标主题
output:
destination: test
default-binder: kafka #默认的binder是kafka
kafka:
bootstrap-servers: 192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022 #kafka服务地址
consumer:
group-id: test-group
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
12.3 代码
接收
@Component
@EnableBinding(Sink.class)
public class KafkaReceive {
private final Logger logger = LoggerFactory.getLogger(KafkaReceive.class);
@Autowired
private ReStoreDetailRecordService reStoreDetailRecordService;
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
logger.info("监听到kafka数据->输入参数【message:"+message+"】");
String receivedString = new String((byte[]) message.getPayload());
logger.info("received kafka message : value:"+receivedString);
Gson gson = new Gson();
RecordDetailDO recordDetailDO = gson.fromJson(receivedString, RecordDetailDO.class);
if (null!=recordDetailDO){
logger.info("将kafka数据储存数据库"+recordDetailDO.toString());
reStoreDetailRecordService.save(recordDetailDO);
}
}
}
发送
@EnableBinding(Source.class)
@Component
public class KafkaSender {
private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private Source source;
public void sendMessage(String msg){
try {
logger.info("准备发送数据到kafka->数据:【message:"+msg+"】");
source.output().send(MessageBuilder.withPayload(msg).build());
}catch (Exception e){
logger.info("准备发送数据到kafka->出错");
e.printStackTrace();
}
}
}
13 还有一套kafkaTemplate也可以操作kafka,是springboot自己集成的kafka,自行搜索
参考链接:
http://kafka.apachecn.org/documentation.html
https://blog.csdn.net/qq_30130043/article/details/80098779
https://www.imooc.com/article/29209?block_id=tuijian_wz
https://blog.csdn.net/vinfly_li/article/details/79397201
https://blog.csdn.net/imgxr/article/details/80130878
https://blog.csdn.net/phantom_111/article/details/79903858
https://baijiahao.baidu.com/s?id=1608205621370302980&wfr=spider&for=pc
https://blog.csdn.net/wing_93/article/details/78513782
https://www.cnblogs.com/likehua/p/3999538.html
https://www.sohu.com/a/144225753_236714