输出 Hello world
首先下载Kafka,地址http://kafka.apache.org
, 目前最新版本是1.0,根据网站的建议,下载Scala 2.11 - kafka_2.11-1.0.0.tgz这版本。
Kafka是严重依赖于ZooKeeper的,通过ZooKeeper来管理各种数据与元数据,因此原则上还需要下载ZooKeeper,不过Kafka 二进制包中也加入ZooKeeper的依赖,因此也可以直接使用 Kafka 自带的ZooKeeper。这里下载官网的压缩包,ZooKeeper的地址是:http://zookeeper.apache.org
,下载版本为3.4.11。
接下来将Zookeeper解压缩,将config目录下的zoo_sample.cfg文件备份,然后重命名为 zoo.cfg,修改zoo.cfg配置文件第12行,dataDir(存放数据的目录位置),可以修改为自己机器的任意目录。修改后的样子dataDir=/mydirectory/software/zookeeper-3.4.11/data
启动Zookeeper
进入bin
目录,执行如下命令
./zkServer.sh start-foreground
如有以下输出,则启动成功
2018-02-25 07:59:32,416 [myid:] - INFO [main:ZooKeeperServer@825] - tickTime set to 2000
2018-02-25 07:59:32,417 [myid:] - INFO [main:ZooKeeperServer@834] - minSessionTimeout set to -1
2018-02-25 07:59:32,417 [myid:] - INFO [main:ZooKeeperServer@843] - maxSessionTimeout set to -1
2018-02-25 07:59:32,432 [myid:] - INFO [main:ServerCnxnFactory@117] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
2018-02-25 07:59:32,449 [myid:] - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
如有提示权限不足,请给zkServer.sh赋予执行权限,命令如下
chmod 777 zkServer.sh
如果提示operation not permitted: ./zkServer.sh
,则执行如下命令
xattr -d com.apple.quarantine zkServer.sh
启动Kafka
进入Kafka目录,执行如下命令
bin/kafka-server-start.sh config/server.properties
如有以下输出,则启动成功
[2018-02-25 09:05:34,480] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-02-25 09:05:34,591] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-02-25 09:05:34,599] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-02-25 09:05:34,601] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(bogon,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-02-25 09:05:34,611] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-25 09:05:34,611] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-25 09:05:34,613] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
如出现permission denied
,则进入bin
目录执行如下代码:
chmod 777 *.sh
创建test主题
执行如下命令
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
出现如下提示,则创建成功
Created topic "test".
启动生产者
执行如下命令
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
出现如下结果,则表示等待输入
➜ kafka_2.11-1.0.0 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
启动消费者
执行如下命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
光标不断闪烁则表示启动成功。
在生产者窗口输入hello world
➜ kafka_2.11-1.0.0 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello world
>
在消费者窗口查看
➜ kafka_2.11-1.0.0 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello world
生产者向Kafka发送一条消息,会在消费者窗口输出,说明环境搭建正确。