1、kafka的设计思想
kafka是一个分布式,多分区,多副本,多发布者/订阅者的,基于zookeeper协调的消息系统。主要功能有3个:
- 发布和订阅消息流,这个功能类似于消息队列;
- 以容错的方式记录消息流,存储消息流文件中;
- 可以在消息发布的时候进行数据处理。
2、kafka的技术架构
kafka集群一般由多个kafka实例组成,每个实例(server)成为broker。kafka根据消息Topic进行归类,发送消息者为Producer,消息接受者为Consumer。Producer将数据生产出来,交给 broker 阵列进行存储,Consumer需要消费数据了,就从broker中去拿数据,然后完成一系列对数据的处理操作。
需要注意的是,producer到broker的过程是push,producer有数据就推送到broker,consumer到broker的过程是pull,是consumer主动去拉数据,而不是broker把数据发送到consumer的。
kafka集群多个broker协同工作,被producer和consumer频繁调用,是通过zookeeper管理协调请求和转发完成的,依赖于zookeeper保存的集群meta信息来保证系统的可用性。Kafka使用zookeeper作为其分布式协调框架,将消息生产、消息存储、消息消费过程结合,确保生产者、消费者、broker在无状态的情况下,建立起生产/消费订阅关系,并实现生产者与消费者的负载均衡。
3、kafka的执行流程
kafka业务执行流程:
- 启动zookeeper的server端(Server2),它维持了一张inf-list表,记录了各个集群节点的 IP、端口等信息。
- 启动kafka的server端(Server1),它开启broker服务,为producer和consumer提供数据读写调用。
- Producer生产者(Server3和Server4),如果生产了数据,会先通过zookeeper找到broker,然后将数据存放到broker上。
- Consumer消费者(Server1),如果要消费数据,会先通过zookeeper找到broker,然后消费。
执行流程图中,Server1、Server2可以部署在一台机器,也可以分开,或配置zookeeper集群。Server3、Server4、Server5必须配置zookeeper的地址作为zkClient,这之间连接都需要zookeeper来进行分发。
4、kafka部署
1、访问kafka官网 http://kafka.apache.org/downloads,下载最新的二进制安装包,如选择版本kafka_2.13-3.0.1.tgz。
2、kafka需要java运行环境,确保部署主机安装了jdk1.8版。
3、解压kafka到安装目录/usr/kafka_2.13,配置文件在config子目录。
4、修改consumer.properites、producer.properties文件,配置生产者和消费者,一般默认的即可。
5、修改server.properties文件,来配置kafka服务器端:broker.id值为服务端ID,集群中各节点必须唯一;listeners值为kafka服务地址,本机访问默认为localhost:9092;zookeeper.connect值为zookeeper连接地址,kafka内置zookeeper为localhost:2181。
6、使用kafka内置的zookeeper,单机部署方式,启动过程如下:
bin/zookeeper-server-start.sh config/zookeeper.properties #启动zookeeper服务
bin/kafka-server-start.sh config/server.properties #启动kafka服务
5、kafka实例应用
1、创建一个topic
打开终端,新建名为test的topic,命令输入:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建topic后,查看创建的topic,命令输入:
bin/kafka-topics.sh --list --zookeeper localhost:2181
2、创建一个消息消费者
打开终端,新建test名称topic的消费者,命令输入:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
消费者创建后,因为没有发送任何数据,执行后没有打印输出。不要关闭这个终端,接下来打开一个新的终端,创建一个消息生产者。
3、创建一个消息生产者
打开一个新的终端,新建test名称topic的生产者,命令输入:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
命令执行完毕,会进入消息编辑器页面,我们发送一条消息,回到消费者终端。可以看到终端打印出我们发送的消息。
附、常规操作命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --list #列出所有主题
bin/kafka-topics.sh --zookeeper localhost:2181 --describe #列出所有主题的详细信息,加--topic my-topic查看具体topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 8 --topic my-topic #创建my-topic主题,1副本,8分区
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 16 #增加分区,注意:分区无法被删除
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-topic #删除主题
bin/kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list #列出消费者群组
bin/kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group my-consumer #列出my-consumer消费者群组详细信息