kafka
简介
Apache Kafka是一个分布式流处理平台。
kafka特性:
- 快:单个kafka服务每秒可以处理数以千计客户端发来的几百MB数据。
- 可扩展:一个单一集群可作为一个大数据处理中枢,集中处理各种类型业务。
- 持久化:消息被持久化到磁盘(可以处理TB级别数据但仍保持极高数据处理效率),并且有备份容错。
- 分布式:着眼于大数据领域,支持分布式,集群可处理每秒百万级别消息。
- 实时性:生产出来的消息可立即被消费者消费。
kafka组件:
- topic:消息存放的目录即主题。
- producer:生产消息到topic的一方。
- consumer:订阅topic消费消息的一方。
- broker:kafka的服务实例就是一个broker。
下载,安装
wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
tar -zxvf kafka_2.11-0.11.0.1.tgz
cd kafka_2.11-0.11.0.1
配置
vim ./config/server.properties
修改如下几项:
broker.id=0 #若配置集群,则每个服务器的broker.id需不相同,只是一个broker的标识,但需是唯一的。并且meta.properties文件里面broker.id的值也应该做相应的修改,文件的位置在server.properties文件中可以看到。
listeners=PLAINTEXT://192.168.213.131:9092 #host.name和端口
zookeeper.connect=192.168.213.130:2181,192.168.213.131:2181,192.168.213.132:2181 #zookeeper集群的ip和端口,多个用逗号分隔。
完成之后,启动kafka.
启动
kafka集群中的每一个服务器都需要执行下面命令进行启动
若已经配置zookeeper并启动了,则直接执行如下命令:
bin/kafka-server-start.sh config/server.properties
使用上面命令启动不能使用Ctrl+c
终止,可以另开一个终端执行其他的命令。
或者执行下面命令从后台启动:
bin/kafka-server-start.sh -daemon config/server.properties
下面这个命令也可以启动(推荐):
nohup bin/kafka-server-start.sh config/server.properties &
按Ctrl+c
终止,并会生成nohup.out文件,里面是启动日志,有错误的话会打印在文件中。
可以通过命令netstat -tupln
查看9092(默认)端口是否启动。
可以通过命令jps
查看kafka是否启动成功。
若没有安装zookeeper,kafka也已经提供了zookeeper单机模式,执行如下命令可开启单节点zookeeper实例。
bin/zookeeper-server-start.sh config/zookeeper.properties
可能会出现的错误:
在虚拟机上运行时,可能出现内存不够的错误,可以在日志中查看具体情况,一种做法是增加swap space,这个比较容易实现,具体操作问度娘。
Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
该错误可以通过修改/etc/hosts
文件中主机名的ip,主机名可以通过hostname
查看,ip可以是本地循环地址(127.0.0.1),或者是ipv4地址。
测试
- 首先创建一个名为test的topic,选项含义可以直接敲入不带选项的下面的命令进行查看。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
具体信息可以通过下面命令查看:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
- 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
出现>
符号后就可以输入内容,并敲回车,之后可以直接Ctrl+c
终止,或者另打开一个终端执行下面命令。
- 接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
执行命令后就可以看到刚才输入的信息内容,表明收发消息成功了。
伪集群模式可以参考http://kafka.apache.org/quickstart
有关kafka更详细的内容介绍请参考kafka中文文档http://ifeve.com/kafka-1/
END