获取kafka
kafka官网:kafka。本文直接下载编译好的kafka二进制文件。由于kafka是由scala和java编写,所以下载kafka时要使用对应的scala版本,本文使用的scala版本为2.11。
若未安装java,请参考: JDK下载和安装
若未安装zookeeper,请参考:zookeeper集群的安装和部署
导出环境变量
在~/.bash_profile中添加一下内容:
export KAFKA_HOME=/root/software/kafka_2.11-2.0.0
export PATH=$PATH:$KAFKA_HOME/bin
修改配置文件server.properties
152节点配置
broker.id=152
host.name=192.168.2.152
listeners = PLAINTEXT://slave2:9092
log.dirs=/root/log/kafka
num.partitions=1
zookeeper.connect=master:2181,slave2:2181,slave3:2181
153节点配置
broker.id=153
host.name=192.168.2.153
listeners = PLAINTEXT://slave3:9092
log.dirs=/root/log/kafka
num.partitions=1
zookeeper.connect=master:2181,slave2:2181,slave3:2181
154节点配置
broker.id=154
host.name=192.168.2.154
listeners = PLAINTEXT://master:9092
log.dirs=/root/log/kafka //消息存放目录,并不是kafka系统日志
num.partitions=1
zookeeper.connect=master:2181,slave2:2181,slave3:2181
启停kafka
注意:确保使用的zookeeper已启动。
每台机器上单独执行下面语句启动
/root/software/kafka_2.11-0.10.2.0/bin/kafka-server-start.sh \
/root/software/kafka_2.11-0.10.2.0/config/server.properties &
每台机器上单独执行下面语句停止
ps -ef|grep kafka|grep -v grep|awk '{print "kill -9", $2}'|sh
验证
创建topic
kafka-topics.sh --create --zookeeper master:2181,slave2:2181,slave3:12181 \
--partitions 3 --replication-factor 3 --topic streaming
显示所有topic
kafka-topics.sh --list --zookeeper localhost:2181
查看topic详情
kafka-topics.sh --describe --zookeeper localhost:2181 --topic streaming
删除topic
kafka-topics.sh --delete --zookeeper localhost:2181 --topic streaming
生产者发布消息
kafka-console-producer.sh --broker-list slave2:9092 --topic streaming
消费者消费消息
kafka-console-consumer.sh --bootstrap-server master:9092 \
--topic streaming --from-beginning