Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
Zookeeper集群部署请查阅上节
一、Kafka 集群下载安装
1、下载包
$ cd /usr/local/services/src
$ wget http://apache.communilink.net/kafka/2.1.0/kafka_2.12-2.1.0.tgz
$ tar xvf kafka_2.12-2.1.0.tgz -C ../
$ cd ../kafka_2.12-2.1.0/
2、配置
broker配置:
broker配置文件为config/server.properties文件,配置内容主要分为以下几个模块,
Server Basics
Kafka server 基本配置
broker.id:是kafka集群server的唯一标识。
broker.id=1
Socket Server Settings
Kafka 网络相关配置
listeners:由用户配置协议,ip,port。
num.network.threads:这个是borker进行网络处理的线程数
num.io.threads:这个是borker进行I/O处理的线程数
socket.send.buffer.bytes: 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes:kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes:这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
其他配置项,开发测试环境可使用默认配置;生产环境推荐如下配置。
listeners=PLAINTEXT://10.4.4.151:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
Log Basics
Kafka log 基本配置
log.dirs:log文件存储路径
num.partitions:topic默认的partitions数量。在创建topic时,一般会指定partitions数量,因此该配置项在上述条件下基本无用。为了防止在创建topic时,未指定partitions数量,因此推荐使用配置为3。
其他配置推荐使用默认配置
log.dirs=/data/kafka/kafka-sa
num.partitions=3
num.recovery.threads.per.data.dir=1
Internal Topic Settings
Kafka 内部topic配置
开发测试环境推荐使用默认配置,均为1
生产环境推荐如下配置,replication数量为3,isr数量为2。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
Log Flush Policy
Kafka log 刷盘、落盘机制
log.flush.interval.messages:日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms:日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
强烈推荐开发、测试、生产环境均采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
若对消息高可靠性要求较高的应用系统,可针对topic级别的配置,配置该属性。
Log Retention Policy
Kafka log保留策略配置
log.retention.hours:日志保留时间,单位小时。和log.retention.minutes两个配置只需配置一项。
message.max.bytes:表示接受消息体的最大大小,单位是字节
default.replication.factor:默认的备份的复制自动创建topics的个数
replica.fetch.max.bytes:最大备份的拉取数量
log.retention.bytes:日志保留大小。一topic的一partition下的所有日志大小总和达到该值,即进行日志清除任务。当日志保留时间或日志保留大小,任一条件满足即进行日志清除任务,-1表示不限制。
log.segment.bytes:日志分段大小。即一topic的一partition下的所有日志会进行分段,达到该大小,即进行日志分段,滚动出新的日志文件。
log.retention.check.interval.ms:日志保留策略定期检查时间间隔,单位ms。
日志保留大小,保留时间以及日志分段大小可根据具体服务器磁盘空间大小,业务场景自行决定。
log.retention.hours=3
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.retention.bytes=5368709120
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
Zookeeper
Kafka zookeeper 配置
zookeeper.connect:zk连接地址
zookeeper.connection.timeout.ms:zk连接超时时间,默认6s。可根据具体的应用场景进行更改,特可采用如下配置。
zookeeper.connect=10.4.4.151:2181,10.4.4.152:2181,10.4.4.153:2181
zookeeper.connection.timeout.ms=60000
Group Coordinator Settings
Kafka consumer group 协调配置
生产环境推荐配置3000
开发测试环境推荐配置0
group.initial.rebalance.delay.ms=3
二、启动Kafka集群并测试
1、系统服务启动配置
$ cat /lib/systemd/system/kafka-sa.service
[Unit]
Description=Apache kafka-sa-sales
After=network.target
[Service]
Type=simple
Environment=JAVA_HOME=/usr/local/services/jdk1.8.0_91
PIDFile=PIDFile=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-sa.pid
ExecStart=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-server-start.sh /usr/local/services/kafka_2.12-2.1.0/config/server.properties
ExecStop=/bin/kill -TERM ${MAINPID}
User=user_00
Group=users
Restart=always
RestartSec=20
[Install]
WantedBy=multi-user.target
2、启动服务
从后台启动Kafka集群(3台都需要启动)
[root@localhost logs]# systemctl start kafka-sa.service
3、检查服务是否启动
[root@localhost logs]# jps
32502 Kafka
521 Jps
23151 QuorumPeerMain
4、创建Topic来验证是否创建成功
# cd /usr/local/services/kafka_2.12-2.1.0/bin
# ./kafka-topics.sh --create --zookeeper 10.4.4.151:2181 --replication-factor 2 --partitions 1 --topic basketball
Created topic "basketball".
#解释
--replication-factor 2 #复制两份
--partitions 1 #创建1个分区
--topic #主题为basketball
查看所有topic和topic 状态
# ./kafka-topics.sh --list --zookeeper 10.4.4.151:2181
basketball
# ./kafka-topics.sh --describe --zookeeper 10.4.4.151:2181 --topic basketball
Topic:basketball PartitionCount:1 ReplicationFactor:2 Configs:
Topic: basketball Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
#分区为1 复制因子为2 Topic basketball的分区为0
#Replicas: 3,1 复制的为3,1
##创建一个broker,发布者发布消息
./kafka-console-producer.sh --broker-list 10.4.4.151:9092 --topic basketball
>NBA
##在到另一台机器或同一台一台机器开一个终端创建一个消费者消费:
./kafka-console-consumer.sh --bootstrap-server 10.4.4.152:9092 --topic basketball --from-beginning
NBA
OKkafka集群搭建完毕
日志说明
server.log #kafka的运行日志
state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
登录zk来查看zk的目录情况
#使用客户端进入zk
./zkCli.sh -server 10.4.4.152:2181
#查看目录情况 执行“ls /”
[zk: 10.4.4.152:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: 10.4.4.152:2181(CONNECTED) 1]
#显示结果:[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
##上面的显示结果中:只有zookeeper是zookeeper原生的,其他都是Kafka创建的
#标注一个broker
[zk: 10.4.4.152:2181(CONNECTED) 3] get /brokers/ids/3
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.4.4.153:9092"],"jmx_port":-1,"host":"10.4.4.153","timestamp":"1547877474320","port":9092,"version":4}
cZxid = 0x600000182
ctime = Sat Jan 19 13:57:53 CST 2019
mZxid = 0x600000182
mtime = Sat Jan 19 13:57:53 CST 2019
pZxid = 0x600000182
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x300046cf32a0005
dataLength = 190
numChildren = 0
#查看partion
[zk: 10.4.4.152:2181(CONNECTED) 4] get /brokers/topics/basketball/partitions/0
null
cZxid = 0x60000019f
ctime = Sat Jan 19 14:07:05 CST 2019
mZxid = 0x60000019f
mtime = Sat Jan 19 14:07:05 CST 2019
pZxid = 0x6000001a0
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1