一、Kafka简介
1.Kafka---A distributed streaming platform
很多企业还是用作消息中间件,其他的消息中间件还有 MQ Redis
2.术语
broker中间环节,是一个kafka进程,作用是数据存储
生产者 producer
消费者 consumer
企业应用:Flume-->Kafka-->Spark Streaming
3.特点:
- 发布和订阅(publish and subscribe) Read and write streams of data like a messaging system
- 实时app Write scalable stream processing applications that react to events in real-time.
- 分布式、副本数、高容错 Store streams of data safely in a distributed, replicated, fault-tolerant cluster.
4.kafka源代码是用scala编写的
5.基本概念: - topic: 主题
- partitions: 分区 分区下标从0开始,假设3个分区,分区下标分别为-0,-1,-2,体现了高并发读写的特点
[hadoop@hadoop000 kafka-logs]$ ll
total 32
-rw-rw-r--. 1 hadoop hadoop 0 Sep 26 23:51 cleaner-offset-checkpoint
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 26 23:58 huluwa_kafka_streaming-0
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 03:10 huluwa_offset-0
-rw-rw-r--. 1 hadoop hadoop 54 Sep 26 23:51 meta.properties
-rw-rw-r--. 1 hadoop hadoop 76 Sep 27 09:13 recovery-point-offset-checkpoint
-rw-rw-r--. 1 hadoop hadoop 77 Sep 27 09:13 replication-offset-checkpoint
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-0
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-1
drwxrwxr-x. 2 hadoop hadoop 4096 Sep 27 08:54 test-2
[hadoop@hadoop000 kafka-logs]$ cd test-0
[hadoop@hadoop000 test-0]$ ll
total 4
-rw-rw-r--. 1 hadoop hadoop 10485760 Sep 27 08:54 00000000000000000000.index
-rw-rw-r--. 1 hadoop hadoop 70 Sep 27 08:55 00000000000000000000.log
-
replication-factor:副本因子,指的是每个partition有几个副本,体现了高容错的特点
假设kafka集群有三台机器,3个partition,3个副本,3个副本分别坐落在三台机器上,此时有一条数据写入机器1的partition-0,那么它的另外两个副本必然是写在机器2的partition-0和机器3的partition-0中,并不是三个副本分别处于partition-0,partition-1和partition-2中
二、常用命令
1.创建一个topic
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 3 \
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
2.查看当前的topic信息
bin/kafka-topics.sh \
--list \
--zookeeper localhost:2181
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
3.发送数据 生产者
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
集群:--broker-list computer1:9092, computer2:9092, computer3:9092
4.接收数据 消费者
bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 \
--topic test \
--from-beginning
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
发送几条数据测试一下:
[hadoop@hadoop000 kafka]$ bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic test
spark
1
2
3
4
5
-------------------------------------------------------------------------------------------------------------
[hadoop@hadoop000 kafka]$ bin/kafka-console-consumer.sh \
> --zookeeper localhost:2181 \
> --topic test \
> --from-beginning
spark
2
1
3
4
5
可以正常发送和接收数据
先把producer和consumer都关掉,再打开consumer,观察到:
[hadoop@hadoop000 kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
spark
3
2
5
1
4
数据的顺序变了,这就是分区有序,全局无序(单个分区内数据有序,全局看是无序的)思考如何保证生产上全局有序
核心点:根据数据的特征设置一个拼装的key,把相同特征的数据比如(mysql里所有处理id=1的数据的语句)发送到同一个topic的1个分区内
hash(key) 取模
5.描述
bin/kafka-topics.sh \
--describe \
--zookeeper localhost:2181 \
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
在集群环境下:
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --describe \
> --zookeeper computer1:2181,computer2:2181,computer3:2181/kafka \
> --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
当id=1的机器挂掉之后:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
6.更改分区数(用在并发不够,加机器的场景下)
bin/kafka-topics.sh \
--alter \
--zookeeper localhost:2181 \
--topic test \
--partitions 4
添加分区数不会重新分布数据
Topic:test PartitionCount:4 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
7.删除
bin/kafka-topics.sh
--delete
--zookeeper localhost:2181
--topic test
集群:--zookeeper computer1:2181,computer2:2181,computer3:2181/kafka
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --delete \
> --zookeeper localhost:2181 \
> --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[hadoop@hadoop000 kafka]$ bin/kafka-topics.sh \
> --list \
> --zookeeper localhost:2181
huluwa_kafka_streaming
huluwa_offset
test - marked for deletion
如果不在server.properties中添加:delete.topic.enable=true,这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,通过命令标记了test-topic要被删除之后Kafka是怎么执行删除操作的呢?下面介绍一个能彻底删除的方法:
[hadoop@hadoop000 bin]$ ./zkCli.sh
//先删除元数据
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/admin/delete_topics/test
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/config/topics/test
[zk: localhost:2181(CONNECTED) 0] rmr /kafka/brokers/topics/test
//再删除真实数据
[hadoop@hadoop000 ~]$ cd $KAFKA_HOME
[hadoop@hadoop000 kafka]$ rm -rf logs/test-*