Kafka版本:我们使用目前最新的版本:0.10.2
Kafka架构:
Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。
Kafka相关概念:
Broker:
Kafka集群包含一个或多个服务器,这种服务器被称为broker。Topic:
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)Partition:
Parition是物理上的概念,每个Topic包含一个或多个Partition.Producer:
负责发布消息到Kafka brokerConsumer:
消息消费者,向Kafka broker读取消息的客户端。Consumer Group:
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
常见的Kafka部署结构:
如上图所示,一个典型的Kafka集群中包含若干Producer(例如广告投放服务记录的广告日志),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个ZooKeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Kafka安装:
- 下载安装包:
下载地址:http://kafka.apache.org/downloads,根据本地安装的scala版本,下载对应的安装包,我下载的是:
kafka_2.11-0.10.2.0.tgz
将下载后的压缩包拷贝到:/Users/wesley/apps 目录下,执行解压:
tar zxvf kafka_2.11-0.10.2.0.tgz - 配置ZooKeeper
由于我们之前已经在本地启动了ZooKeeper服务,所以Kafka之前使用之前的ZooKeeper服务。 - 配置Kafka
进入config目录,修改两个配置信息,如下所示:
- 启动Kafka
bin/kafka-server-start.sh config/server.properties &
我们可以使用lsof命令检查一下Kafka服务端口有没有正常启动:
lsof -i:9092
9092是Kafka进程的默认占用端口。
- 使用Kafka
我们创建具有一个分区一个副本的Topic:ad_log
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ad_log