目标
高吞吐量来支持高容量的事件流处理
支持从离线系统加载数据
低延迟的消息系统
持久化
依赖文件系统,持久化到本地
数据持久化到log
效率
- 解决”small IO problem“:
- 使用”message set“组合消息。
- server使用”chunks of messages“写到log。
- consumer一次获取大的消息块。
- 解决”byte copying“:
- 在producer、broker和consumer之间使用统一的binary message format。
- 使用系统的pagecache。
- 使用sendfile传输log,避免拷贝。
- 端到端的批量压缩(End-to-end Batch Compression)
- Kafka支持GZIP和Snappy压缩协议。
主要构件
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker,主要用来接收、存储(and持久化)、分发消息。 - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处) - Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件 - Producer
负责发布消息(push)到Kafka broker - Consumer
pull消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
(broker内部划分可以学习一下,版本较早)
Kafka存储策略
1)kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
Topic在逻辑上可以被认为是一个queue,每条消息都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。 为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件 夹下存储这个partition的所有消息和索引文件。
2)每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
3)每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
4)发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有 消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个 topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。 paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
删除策略
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际 上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
1)N天前的删除。
2)保留最近的MGB数据。
- 因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只 与磁盘以及具体的需求有关。
- Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。
这个offset由consumer控制。
正常情况下 consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。
因为 offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
Kafka broker
与其它消息系统不同,Kafka broker是无状态的
。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护
,broker完全不管(有offset managerbroker管理)。
- 代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。
- 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。
The Producer
负载均衡
1)producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。
2)自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。
异步批量发送
批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。
The Consumer
consumer控制消息的读取。
Push vs Pull
1)producer push data to broker,consumer pull data from broker
2)consumer pull的优点:consumer自己控制消息的读取速度和数量。
3)consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。
Consumer Position
大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。
Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。
Message Delivery Semantics(消息投递语义)
At most once—Messages may be lost but are never redelivered.
At least once—Messages are never lost but may be redelivered.
Exactly once—this is what people actually want, each message is delivered once and only once.
Producer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。
-
Consumer:
读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应”At most once“。
读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。
读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。
Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。
复制(Replication)
1)一个partition的复制个数(replication factor)包括这个partition的leader本身。
2)所有对partition的读和写都通过leader。
3)Followers通过pull获取leader上log(message和offset)
4)如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。
5)当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。
6)如果针对某个partition的所有复制节点都挂了,Kafka选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。
日志压缩(Log Compaction)
1)针对一个topic的partition,压缩使得Kafka至少知道每个key对应的最后一个值。
2)压缩不会重排序消息。
3)消息的offset是不会变的。
4)消息的offset是顺序的。
Distribution
Consumer Offset Tracking
1)High-level consumer记录每个partition所消费的maximum offset,并定期commit到offset manager(broker)。
2)Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。
Consumers and Consumer Groups
1)consumer注册到zookeeper
2)属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。
3)当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。
Zookeeper协调控制
1)管理broker与consumer的动态加入与离开。
2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。
3)维护消费关系及每个partition的消费信息。
Replication
Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置。
该 Replication与leader election配合提供了自动的failover机制。
replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况 下,Kafka的replication数量为1。
每个partition都有一个唯一的leader,所有的读写操作都在leader上完 成,follower批量从leader上pull数据。
一般情况下partition的数量大于等于broker的数量,并且所有partition的 leader均匀分布在broker上。follower上的日志和其leader上的完全一样。
和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。
-
对于Kafka而言,Kafka存活包含两个条件,
- 一是它必须 维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。
- 二是follower必须能够及时将 leader的writing复制过来,不能“落后太多”。
leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在$KAFKA_HOME/config/server.properties中配置
需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何 follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息 commit,这可以通过request.required.acks来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。
这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种 复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被 leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。
Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极 大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。
Leader election
另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的 leader。
因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。
一个基本的原则就 是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。
这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。
一种非常常用的选举leader的方式是“majority 领袖”(“少数服从多数”),
但Kafka并未采用这种方式
。
因为这种模式下,如果我们有2f+1个replica(包含leader和follower), 那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩 下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。
- 这种方式有个很大的优势,系统的latency只取决于最快的几台 server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。
- majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的 replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量 的replica,而大量的replica又会在大数据量下导致性能的急剧下降。
- 这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。
实际上,leader election算法非常多,比如Zookeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的leader election算法更像微软的PacificA算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个 replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个 replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。
虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个replica“活”过来,并且选它作为leader
- 选择第一个“活”过来的replica(不一定是ISR中的)作为leader
这就需要在可用性和一致性当中作出一个简单的平衡。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本 中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
Kafka选举一个broker作为controller,这个controller通过 watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。
这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller 失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id}(其实就是分布式锁来争夺Leader),如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新 controller的命令。
Consumer group
- 每一个consumer实例都属于一个consumer group
- 每一条消息只会被同一个consumer group里的一个consumer实例消费。
-
不同consumer group可以同时消费同一条消息
而如上文所将,Kafka并不删除 已消费的消息。
- 为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。
- 与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。
-
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一 特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一 个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。(这个牛逼)
下图展示了Kafka在Linkedin的一种简化部署。
为了更清晰展示Kafka consumer group的特性,笔者(不是我啊)作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer 向topic1发送key分别为1,2,3的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个 consumer分别收到了key为1,2,3的消息。(也就是说,一个group的consumers是用来做负载的)
Consumer Rebalance
- 表面上Kafka保证同一consumer group中只有一个consumer会消息某条消息
- 实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定 partition的数据,
而某个partition的数据只会被某一个特定的consumer实例所消费。
- 这样设计的劣势是无法让同一个 consumer group里的consumer均匀消费数据
- 优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也 更简单。
- 另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据;如果consumer 的数量与partition数量相同,则正好一个consumer消费一个partition的数据;而如果consumer的数量多于 partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
- 如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。
- 增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。
- 再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
- 再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
- 此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
- 接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
- 再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
consumer rebalance算法如下:
- Sort PT (all partitions in topic T)
- Sort CG(all consumers in consumer group G)
- Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
- Remove current entries owned by Ci from the partition owner registry
- Assign partitions from iN to (i+1)N-1 to consumer Ci
- Add newly assigned partitions to the partition owner registry
目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:
- Register itself in the consumer id registry under its group.
- Register a watch on changes under the consumer id registry.
- Register a watch on changes under the broker id registry.
- If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
- Force itself to rebalance within in its consumer group.
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。
目前(2015-01-19)最新版(0.8.2 已不是最新)Kafka采用的是上述方式。但该方式有不利的方面:
- Herd effect(惊群效应)
任何broker或者consumer的增减都会触发所有的consumer的rebalance - Split Brain(脑裂)
每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的 consumer都认为rebalance已经完成了,但实际上可能并非如此。
Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。 大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance 在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:
消息Deliver guarantee
有这么几种可能的delivery guarantee:
- At most once 消息可能会丢,但绝不会重复传输
- At least one 消息绝不会丢,但可能会重复传输
- Exactly once 每条消息肯定会被传输一次且仅传输一次(很多时候这是用户所想要的。幂等)
producer到broker
Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。
但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。(这一点 有点像向一个自动生成primary key的数据库表中插入数据。)
虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one。
截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。
目前默认情况下一条消息从producer和broker是确保了At least once,但可通过设置producer异步发送实现At most once
broker到consumer
接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。
consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该 partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。
如未commit,下一次读取 的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动 commit。
如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理
,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
- 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
- 读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。
- 在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。 (人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的 feature)
- 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如 果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数 据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完 成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。
代码实例
生产者代码
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// partion 策略
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
消费者代码
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
Ref:
http://www.importnew.com/24677.html
http://lxw1234.com/archives/2015/09/504.htm
http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/