概念:
Kafka作为一个分布式的流平台具有三个关键能力:
1.发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统
2.以容错的方式存储消息(流)
3.在消息流发生时处理它们
优势:
1.构建实时的流数据管道,可靠地获取系统和应用程序之间的数据
2.构建实时流的应用程序,对数据流进行转换或反应
首先几个概念:
1.kafka作为一个集群运行在一个或多个服务器上。
2.kafka集群存储的消息是以topic为类别记录的。
3.每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。
kafka有四个核心API:
1.应用程序使用 Producer API 发布消息到1个或多个topic(主题)。
2.应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
3.应用程序使用StreamsAPI充当一个流处理器,从1个或多个topic消费输入流,
并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
4.ConnectorAPI允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。
下载安装:
我是在自己的mac安装的,下面给出安装命令:
brew install kafka (会自动安装Zookeeper)
启动:
brew services start zookeeper
brew services start kafka
if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
使用kafka:
1.创建一个名为 dog 的Topic, 只有一个分区和一个备份
kafka-topics --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dog
2.查看创建的topic信息
kafka-topics --list --zookeeper localhost:9092
3.发送消息
kafka-console-producer --broker-list localhost:9092 --topic dog
4.消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic dog
生产消息 和 消费消息 见下图:
kafka在linux的集群搭建
1.下载: http://kafka.apache.org/downloads.html
2.安装:
#解压到目录: cd usr/local/kafka
tar -zxvf kafka_2.12-2.5.0.tgz
# 创建logs目录
mkdir logs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# ls
bin config libs LICENSE logs NOTICE site-docs
## 修改配置文件: vim /config/server.properties ##
# broker的全局唯一编号,不能重复
broker.id=2
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka/logs
# topic在当前broker上的分区个数
num.partitions=1
listeners=PLAINTEXT://hadoop2:9092
advertised.listeners=PLAINTEXT://hadoop2:9092
# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181
# 配置环境变量
## sudo vim /etc/profile
kafka_home
export KAFKA_HOME=/usr/local/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin
## source /etc/profile
## 分发到各台机器(假设当前在hadoop2这台机器)
[root@hadoop2 kafka]# ls
kafka
[root@hadoop2 kafka]# pwd
/usr/local/kafka
[root@hadoop2 kafka]# scp -r kafka root@hadoop3:/usr/local/kafka
# 修改每台机器的broker.id
## vim /config/server.properties
broker.id = 2 (hadoop2)
broker.id = 3 (hadoop3)
broker.id = 4 (hadoop4)
# 启动集群
## 依次在hadoop2、hadoop3、hadoop4 启动kafka
[root@hadoop2 kafka]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# bin/kafka-server-start.sh config/server.properties &
# 启动集群方式2
[root@hadoop2 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop2 kafka]# jps
9462 Kafka
9480 Jps
3419 QuorumPeerMain
# 关闭集群
[root@hadoop2 kafka]# bin/kafka-server-stop.sh stop
Kafka命令行操作
# 查看当前服务器中的所有topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
# 创建topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 \ --create --replication-factor 3 --partitions 1 --topic first
# 报错:
[root@hadoop2 kafka]# bin/kafka-topics.sh zookeeper hadoop4:2181 --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:702)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
## 注意: broker.id 要与对应的zookeepr的myid保持一致
主机名 host(ip) zookeeper myid
kafka-2 hadoop2 server.2 2
kafka-3 hadoop3 server.3 3
kafka-4 hadoop4 server.4 4
- 尝试正确的创建topic命令
# 在hadoop4机器上创建topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop4:2181 --replication-factor 3 --partitions 6 --topic test
Created topic test.
## 选项说明
--topic : 定义topic名
--partitions: 定义分区数
--replication-factor: 定义副本数
# 查看创建的topic(创建成功后hadoop2、hadoop3都可查看)
[root@hadoop4 kafka] bin/kafka-topics.sh --describe --zookeeper hadoop4:2181 --topic test
> Topic: test PartitionCount: 6 ReplicationFactor: 3 Configs:
> Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2
> Topic: test Partition: 1 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2
> Topic: test Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
> Topic: test Partition: 3 Leader: 2 Replicas: 2,4,3 Isr: 2
> Topic: test Partition: 4 Leader: 3 Replicas: 3,2,4 Isr: 3,2,4
> Topic: test Partition: 5 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2
# 删除topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 --delete --topic test
# 在hadoop2上启动消费者
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop2:9092 --topic test
# 在hadoop4启动生产者
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list hadoop2:9092 --topic test
调整准则:
一般来说,若是集群较小(小于6个brokers),则配置2 x broker数的partition数。在这里主要考虑的是之后的扩展。若是集群扩展了一倍(例如12个),则不用担心会有partition不足的现象发生
一般来说,若是集群较大(大于12个),则配置1 x broker 数的partition数。因为这里不需要再考虑集群的扩展情况,与broker数相同的partition数已经足够应付常规场景。若有必要,则再手动调整
考虑最高峰吞吐需要的并行consumer数,调整partition的数目。若是应用场景需要有20个(同一个consumer group中的)consumer并行消费,则据此设置为20个partition
考虑producer所需的吞吐,调整partition数目(如果producer的吞吐非常高,或是在接下来两年内都比较高,则增加partition的数目)
kafka工作流程分析
写入方式:producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分 区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)
-
分区: 消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些 Partition Logs(分区日志)组成,其组织结构如下图所示
我们可以看到,每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。
分区的原因
1>方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了
2>可以提高并发,因为可以以 Partition 为单位读写了分区原则
1>指定了 patition,则直接使用
2>未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个 patition
3>patition 和 key 都未指定,使用轮询选出一个 patition副本
同一个 partition 可能会有多个 replication(对应 server.properties 配置中的 default.replication.factor=N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入 replication 之 后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据
写入流程
1>producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2>producer 将消息发送给该 leader
3>leader 将消息写入本地 log
4>followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5>leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK
保存消息
存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配 置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文 件)存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
1>基于时间:log.retention.hours=168
2>基于大小:log.retention.bytes=1073741824
需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所
以这里删除过期文件与提高 Kafka 性能无关
注意:
ACK的确认方式有:
0: 不需要确认, 速度块
1: 生产者需要Leader的确认
all: 生产者需要Leder、Follower的确认
小试牛刀
- 创建一个Maven项目 , pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkademo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients !-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- 日志包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</project>
- 在resources目录下创建一个log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 代码
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
// 配置信息
// 1.kafka集群
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
// 2.应答机制
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
// 3.重试次数
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
// 4.批量大小
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // 16k
// 提交延时
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
// 缓存
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); // 32M
// KV的序列化类
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=10; i<20; i++){
producer.send(new ProducerRecord<String, String>("test", Integer.valueOf(i).toString()), (recordMetadata, e) -> {
if (e == null) {
System.out.println(recordMetadata.partition() + "-----" + recordMetadata.offset());
} else {
System.out.println("发送失败");
}
});
}
// 关闭资源
producer.close();
}
}
运行程序
- 开启消费者
[root@hadoop4 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop4:9092 --topic test
0
1
2
3
4
5
6
7
8
9
注意:如果topic有多(3)个分区,那么则是按照分区顺序输出的,有可能是输出 0 3 6 9 1 4 7 2 5 8, 但是每个分区内部是有序的
创建多分区多副本
[root@hadoop2 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 3 --partitions 3 --topic secondTest
Created topic secondTest.
# 查看创建多分区多副本的topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --describe --zookeeper hadoop4:2181 --topic secondTest
Topic: secondTest PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: secondTest Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: secondTest Partition: 1 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2
Topic: secondTest Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
[root@hadoop2 kafka]#
- 自定义分区
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
自定义分区
*/
public class CustomerPartitioner implements Partitioner {
@Override
public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// return 0;
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
int partitionNum = partitionInfoList.size();
Random random = new Random();
return random.nextInt(partitionNum); // [0 partitionNum) 中的一个随机数
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
# 生产者(Producer)代码 参考上例
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "CustomerPartitioner");
控制台输出:
分区=0-----偏移量=2
分区=0-----偏移量=3
分区=1-----偏移量=2
分区=1-----偏移量=3
分区=1-----偏移量=4
分区=1-----偏移量=5
分区=1-----偏移量=6
分区=1-----偏移量=7
分区=1-----偏移量=8
分区=2-----偏移量=6
kafka消费者输出:
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop2:9092 --topic secondTest
1
5
0
2
3
6
7
8
9
4
由上可知, 1,5 在分区0中,0,2,3,6,7,8,9在分区1中, 4在分区2中 , 每个分区维护着一套自己的offset, 每个分区内部有序
消费者(Consumer)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
// kafka集群
properties.setProperty("bootstrap.servers", "hadoop2:9092");
// 消费者组id
properties.setProperty("group.id", "secondTest");
// 设置自动提交offset
properties.setProperty("enable.auto.commit", "true");
// 延时提交
properties.setProperty("auto.commit.interval.ms", "1000");
// KV的反序列化
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 指定topic
// topic = "third" 这个topic是没有创建的,开启了log4j后会提示
// WARN [org.apache.kafka.clients.NetworkClient] - Error while fetching metadata with correlation id 2 : {third=LEADER_NOT_AVAILABLE}"
kafkaConsumer.subscribe(Arrays.asList("test", "secondTest","third"));
// 如果想只消费某一个指定的topic
// kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.println(
"topic=" + record.topic() + "---" +
"patition=" + record.partition() + "---" +
"key=" + record.key() + "---" +
"value=" + record.value() + "---" +
"headers=" + record.headers().toString()
) ;
}
}
}
}
# 生产测试数据
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list hadoop4:9092 --topic secondTest
>wudy haha
>still water run deep
>
# console输出
topic=secondTest---patition=2---key=null---value=wudy haha---headers=RecordHeaders(headers = [], isReadOnly = false)
topic=secondTest---patition=0---key=null---value=still water run deep---headers=RecordHeaders(headers = [], isReadOnly = false)
- kafka如何重复消费?
1.使用低级API直接指定到哪个offset
2.设置auto.offset.reset=earliest, 并更换组名
properties.setProperty("group.id", "wudyGroup"); // 原来 == secondTest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- kafka seek方法
# 可能存在的报错
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition secondTest-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
at Consumer.main(Consumer.java:51)
# 解决方式1
// 指定消费某一个partition分区的数据
kafkaConsumer.assign(Collections.singletonList(new TopicPartition("secondTest", 0)));
// 指定获取某个topic的指定分区的指定offset的数据
kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
# 解决方式2
// 如果想只消费某一个指定的topi
kafkaConsumer.subscribe(Collections.singletonList("secondTest"));
kafkaConsumer.poll(0);
// 指定获取某个topic的指定分区的指定offset的数
kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
拦截器
Producer拦截器(interceptor)主要用于实现clients端的定制化控制逻辑
对于Producer而言,interceptor使得用户在消息发送前以及producer回掉逻辑前有机会对消息做一些定制化需求,比如修改消息
同时Producer允许用户指定多个interceptor按序作用于同一消息从而形成一个拦截链(org.apache.kafka.clients.producer.ProducerInterceptor)
1.configure(configs)
获取配置信息和初始化数据时调用2.onSend(ProducerRecord)
该方法封装进kafkaProducer.send方法中,即它运行在用户主线程中,Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区3.onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,通常都是在producer回掉逻辑触发之前。onAcknowledgement运行在producer的IO线程中,不要在该方法中放很重的逻辑,否则会拖慢Producer的消息发送效率4.close
关闭interceptor, 主要用于执行一些资源清理工作。
interceptor可能被运行在多个线程中,因此在具体实现时需要用户自行确保线程安全,倘若指定了多个interceptor,则producer将按照指定顺序调用她们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志再向上传递