Kafka集群

Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
Zookeeper集群部署请查阅上节

一、Kafka 集群下载安装

1、下载包

$ cd  /usr/local/services/src
$ wget http://apache.communilink.net/kafka/2.1.0/kafka_2.12-2.1.0.tgz
$ tar xvf kafka_2.12-2.1.0.tgz -C ../
$ cd ../kafka_2.12-2.1.0/

2、配置
broker配置:
broker配置文件为config/server.properties文件,配置内容主要分为以下几个模块,

Server Basics

Kafka server 基本配置
broker.id:是kafka集群server的唯一标识。

broker.id=1
Socket Server Settings

Kafka 网络相关配置

listeners:由用户配置协议,ip,port。
num.network.threads:这个是borker进行网络处理的线程数
num.io.threads:这个是borker进行I/O处理的线程数
socket.send.buffer.bytes: 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes:kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes:这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
其他配置项,开发测试环境可使用默认配置;生产环境推荐如下配置。

listeners=PLAINTEXT://10.4.4.151:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
Log Basics

Kafka log 基本配置

log.dirs:log文件存储路径
num.partitions:topic默认的partitions数量。在创建topic时,一般会指定partitions数量,因此该配置项在上述条件下基本无用。为了防止在创建topic时,未指定partitions数量,因此推荐使用配置为3。
其他配置推荐使用默认配置

log.dirs=/data/kafka/kafka-sa
num.partitions=3
num.recovery.threads.per.data.dir=1
Internal Topic Settings

Kafka 内部topic配置

开发测试环境推荐使用默认配置,均为1
生产环境推荐如下配置,replication数量为3,isr数量为2。

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
Log Flush Policy

Kafka log 刷盘、落盘机制

log.flush.interval.messages:日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
log.flush.interval.ms:日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
强烈推荐开发、测试、生产环境均采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
若对消息高可靠性要求较高的应用系统,可针对topic级别的配置,配置该属性。

Log Retention Policy

Kafka log保留策略配置

log.retention.hours:日志保留时间,单位小时。和log.retention.minutes两个配置只需配置一项。
message.max.bytes:表示接受消息体的最大大小,单位是字节
default.replication.factor:默认的备份的复制自动创建topics的个数
replica.fetch.max.bytes:最大备份的拉取数量
log.retention.bytes:日志保留大小。一topic的一partition下的所有日志大小总和达到该值,即进行日志清除任务。当日志保留时间或日志保留大小,任一条件满足即进行日志清除任务,-1表示不限制。
log.segment.bytes:日志分段大小。即一topic的一partition下的所有日志会进行分段,达到该大小,即进行日志分段,滚动出新的日志文件。
log.retention.check.interval.ms:日志保留策略定期检查时间间隔,单位ms。
日志保留大小,保留时间以及日志分段大小可根据具体服务器磁盘空间大小,业务场景自行决定。

log.retention.hours=3
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.retention.bytes=5368709120
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
Zookeeper

Kafka zookeeper 配置

zookeeper.connect:zk连接地址
zookeeper.connection.timeout.ms:zk连接超时时间,默认6s。可根据具体的应用场景进行更改,特可采用如下配置。

zookeeper.connect=10.4.4.151:2181,10.4.4.152:2181,10.4.4.153:2181
zookeeper.connection.timeout.ms=60000
Group Coordinator Settings

Kafka consumer group 协调配置

生产环境推荐配置3000
开发测试环境推荐配置0

group.initial.rebalance.delay.ms=3

二、启动Kafka集群并测试

1、系统服务启动配置

$ cat /lib/systemd/system/kafka-sa.service 
[Unit]
Description=Apache kafka-sa-sales
After=network.target

[Service]
Type=simple
Environment=JAVA_HOME=/usr/local/services/jdk1.8.0_91
PIDFile=PIDFile=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-sa.pid
ExecStart=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-server-start.sh  /usr/local/services/kafka_2.12-2.1.0/config/server.properties
ExecStop=/bin/kill -TERM ${MAINPID}
User=user_00
Group=users
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target

2、启动服务
从后台启动Kafka集群(3台都需要启动)

[root@localhost logs]# systemctl start kafka-sa.service

3、检查服务是否启动

[root@localhost logs]# jps
32502 Kafka
521 Jps
23151 QuorumPeerMain

4、创建Topic来验证是否创建成功

# cd /usr/local/services/kafka_2.12-2.1.0/bin
# ./kafka-topics.sh --create --zookeeper 10.4.4.151:2181 --replication-factor 2 --partitions 1 --topic basketball
Created topic "basketball".

#解释
--replication-factor 2   #复制两份
--partitions 1 #创建1个分区
--topic #主题为basketball

查看所有topic和topic 状态

# ./kafka-topics.sh --list --zookeeper 10.4.4.151:2181
basketball
# ./kafka-topics.sh --describe --zookeeper 10.4.4.151:2181 --topic basketball
Topic:basketball    PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: basketball   Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1

#分区为1  复制因子为2   Topic basketball的分区为0 
#Replicas: 3,1   复制的为3,1
##创建一个broker,发布者发布消息
./kafka-console-producer.sh --broker-list 10.4.4.151:9092 --topic basketball
>NBA

##在到另一台机器或同一台一台机器开一个终端创建一个消费者消费:
./kafka-console-consumer.sh --bootstrap-server 10.4.4.152:9092 --topic basketball --from-beginning
NBA

OKkafka集群搭建完毕

日志说明

server.log #kafka的运行日志
state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里

controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

登录zk来查看zk的目录情况

#使用客户端进入zk
./zkCli.sh -server 10.4.4.152:2181

#查看目录情况 执行“ls /”
[zk: 10.4.4.152:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: 10.4.4.152:2181(CONNECTED) 1]

#显示结果:[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
##上面的显示结果中:只有zookeeper是zookeeper原生的,其他都是Kafka创建的

#标注一个broker
[zk: 10.4.4.152:2181(CONNECTED) 3] get /brokers/ids/3
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.4.4.153:9092"],"jmx_port":-1,"host":"10.4.4.153","timestamp":"1547877474320","port":9092,"version":4}
cZxid = 0x600000182
ctime = Sat Jan 19 13:57:53 CST 2019
mZxid = 0x600000182
mtime = Sat Jan 19 13:57:53 CST 2019
pZxid = 0x600000182
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x300046cf32a0005
dataLength = 190
numChildren = 0

#查看partion
[zk: 10.4.4.152:2181(CONNECTED) 4] get /brokers/topics/basketball/partitions/0
null
cZxid = 0x60000019f
ctime = Sat Jan 19 14:07:05 CST 2019
mZxid = 0x60000019f
mtime = Sat Jan 19 14:07:05 CST 2019
pZxid = 0x6000001a0
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容