消息中间件一般用于各个模块、系统之间的异步通信,降低各个模块之间的耦合性。
Kafka作为一个分布式的流平台,这到底意味着什么?
我们认为,一个流处理平台具有三个关键能力:
发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统。
以容错的方式存储消息(流)。
在消息流发生时处理它们。
什么是kakfa的优势?
它应用于2大类应用:
构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
构建实时流的应用程序,对数据流进行转换或反应。
要了解kafka是如何做这些事情的,让我们从下到上深入探讨kafka的能力。
首先几个概念:
kafka作为一个集群运行在一个或多个服务器上。
kafka集群存储的消息是以topic为类别记录的。
每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。
kafka有四个核心API:
应用程序使用Producer API发布消息到1个或多个topic(主题)。
应用程序使用Consumer API来订阅一个或多个topic,并处理产生的消息。
应用程序使用Streams API充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。
Kafka所使用的基本术语:
Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
Producer
发布消息的对象称之为主题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
Kafka的保证(Guarantees)
生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,如果一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,并且优先的出现在日志中。
消费者收到的消息也是此顺序。
如果一个Topic配置了复制因子(replication factor)为N, 那么可以允许N-1服务器宕机而不丢失任何已经提交(committed)的消息。
kafka作为一个消息系统
Kafka的流与传统企业消息系统相比的概念如何?
传统的消息有两种模式:队列和发布订阅。 在队列模式中,消费者池从服务器读取消息(每个消息只被其中一个读取); 发布订阅模式:消息广播给所有的消费者。这两种模式都有优缺点,队列的优点是允许多个消费者瓜分处理数据,这样可以扩展处理。但是,队列不像多个订阅者,一旦消息者进程读取后故障了,那么消息就丢了。而发布订阅允许你广播数据到多个消费者,由于每个订阅者都订阅了消息,所以没办法缩放处理。
kafka中消费者组有两个概念:队列:消费者组(consumer group)允许同名的消费者组成员瓜分处理。发布订阅:允许你广播消息给多个消费者组(不同名)。
kafka的每个topic都具有这两种模式。
kafka有比传统的消息系统更强的顺序保证。
传统的消息系统按顺序保存数据,如果多个消费者从队列消费,则服务器按存储的顺序发送消息,但是,尽管服务器按顺序发送,消息异步传递到消费者,因此消息可能乱序到达消费者。这意味着消息存在并行消费的情况,顺序就无法保证。消息系统常常通过仅设1个消费者来解决这个问题,但是这意味着没用到并行处理。
kafka做的更好。通过并行topic的parition ——kafka提供了顺序保证和负载均衡。每个partition仅由同一个消费者组中的一个消费者消费到。并确保消费者是该partition的唯一消费者,并按顺序消费数据。每个topic有多个分区,则需要对多个消费者做负载均衡,但请注意,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。
kafka的安装和启动
Zookeeper安装
①、进入Zookeeper解压目录,C:\softdownload\zookeeper\conf
②、将“zoo_sample.cfg”重命名为“zoo.cfg”。
③、配置启动日记目录,用#注解调 dataDir=/tmp/zookeeper
dataDir=C:\\softdownload\\zookeeper\\data
④、系统环境变量中添加:ZOOKEEPER_HOME = C:\softdownload\zookeeper
⑤、编辑系统变量path,加上: ZOOKEEPER_HOME%\bin;
⑥、确认zoo.cfg文件中默认的Zookeeper端口(默认端口2181)。
打开新的cmd,进入安装目录:cd C:\softdownload\zookeeper\bin,输入zkserver,运行Zookeeper。
安装Kafka
①、进入Kafka配置目录,C:\softdownload\kafka\config
编辑文件“server.properties”
找到并用#注解“log.dirs=/tmp/kafka-logs”
添加自己的日记目录:log.dirs=C:\softdownload\kafka\kafka-logs
②、如果Zookeeper在某些其他的机器或集群上运行,可以将“zookeeper.connect:2181”修改为自定义IP与端口。在这里使用了同一个机器,所以没其他做修改。文件中的Kafka端口和broker.id也是可以配置的。默认设置不变。机器的localhost也为127.0.0.1,这里我也修改为ipv4的,防止localhost为ipv6时受影响。
③、Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。
在zookeeper的基础上,运行Kafka服务
进入Kafka安装目录,C:\softdownload\kafka,切换到命令行窗口,运行kafka。
输入命令:.\bin\windows\kafka-server-start.bat .\config\server.properties
创建主题Topic
1. 现在创建主题,命名为“test”,replication factor=1(因为只有1个Kafka服务器在运行)。如果集群中所运行的Kafka服务器不止1个,可以相应增加replication-factor,从而提高数据可用性和系统容错性。
2. 在C:\softdownload\kafka\bin\windows打开新的命令行。
3. 输入下面的命令,回车:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建Producer及Consumer来测试服务器。
1.在C:\softdownload\kafka\bin\windows打开新的命令行。
2.输入以下命令,启动producer,可以输入消息:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
0.9版本以上新命令:.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test --producer.config .\config\producer.properties
3.在同样的位置C:\softdownload\kafka\bin\windows再次打开新的命令行。
4.现在输入下列命令启动consumer,可以获取消息:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
0.9版本以上新命令:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config .\config\consumer.properties
5.两个命令行窗口,producer可以输入任何消息,consumer可以获取消息。
使用JavaAPI连接远程服务器,要在service.properties配置host.name=192.168.111.111(远程IP地址),这主要是因为,kafka默认是监听localhost的端口,如果不配置新端口名的话,就解析监听不到消息。
PS:上面总结的,有什么不足之处,欢迎更正补充。