当kafka最初被创建的时候,它附带一个Scala的生产者和消费者客户端。随着时间的推移,我们逐渐意识到这些API有很多限制。
举个例子,我们有一个“高级”的消费者API,它支持消费组和处理故障恢复,但是不支持许多更复杂的使用场景。我们也有一个“低级”消费者客户端,提供完全控制,但是需要用户去处理故障转移和错误处理。因此,我们开始重新设计这些客户端。以便开辟许多用旧的客户端难以或不可能处理的问题,并建立一组我们可以长期支持的API。
第一阶段在0.8.1中重写Producer API。最近的0.9版本通过引入新的消费者API来完成第二阶段。基于Kafak本身提供的新的组协调协议,新消费者带来以下优点:
清除合并API:新的消费者结合了旧的“简单”和“高级”消费者客户端的功能,提供组协调和低级访问来构建自己的消费策略。
降低依赖性:新的消费者用纯Java来书写,它既在运行时候依赖Scala也没有依赖在Zookeeper上,它可以使你的工程包含更轻的库。
更好的安全性:在kafka的0.9中实现的安全扩展仅由新消费者支持。
新的消费者还添加了一组用于管理消费者进程的容错组协议。以前,此功能使用一个重的Java客户端(通过与Zookeeper大量交互)实现的。这种逻辑的复杂性是的难以在其他语言中构建功能齐全的消费者。随着这个新协议的引入,现在变得更加容易。事实上,我们已经将C客户端移动到此协议。
虽然新的消费者使用重新设计的API和新的协调协议,但是这些概念并不是完全不同。因此熟悉旧的消费者用户不应该对理解它有太多的麻烦。然而,有一些细微的细节,特别是关于组管理和线程模型,需要一些额外的关心。
谨慎起见:在写代码的时候,新的消费者在稳定性方面仍然被认为是Beta版本。我们已经修复了0.9.0分支的几个重要的bug,所以如果你遇到任何问题使用0.9.0.0版本的Kafaka,我们估计你测试它。如果你仍然看到这个问题,请在邮件列表或JIRA上报告。
入门
在进入代码之前,我们先来回忆些基本的概念。在Kafak中,每个topic被以分区的形式划分为一个日志组。生产者写这些日志的尾部,消费者以自己的节奏来读取日志。
kafka通过在一个消费组中并发分区的数量来控制日志的消费。消费组是由相同的组标示的,一组消费者组成。下面显示具有三个分区的单个主题和具有两个成员的使用者组。
主题中的每个分区被精确的指定到消费组中的一个消费者。
在旧消费者中通过依赖Zookeeper来实现组的管理,一个新消费者
使用内置于Kafka本身的组协调协议。对于每个组,选择一个broker来作为组协调器。协调器负责管理组的状态。它的主要工作是,在新成员到达,旧成员离开以及主题的元数据修改的时候调解分区分配。重新分配分区的行为称为重新平衡组。
当组首次初始化的时,消费者通常从每个分区的最早或最晚的偏移量开始读取。然后顺序读取每个分区中的消息。当消费者取得进展时候,它提交它已经成功处理的消息的偏移量。
例如,在下图中,消费者的位置在偏移6,其最后的提交的偏移1.
当分区重新分配给组中的另外一个使用者时,初始位置设置为最后一个已提交的偏移量。如果上面例子中的消费者突然崩溃了,那么接管的组成员将从偏移量1开始消费。在这种情况下,它必须重新处理消息直到崩溃消费者的位置6.
该图还显示了日志中的另外两个重要位置。日志结束偏移量是写入日志的最后一条消息的偏移量。高水印是成功复制到所有日志副本的最后一条消息的偏移量。从消费者的角度来看,主要知道的是,你只能读取高水印。这防止消费者读取稍后可能丢失的未复制数据。
配置和初始化
要开始使用消费者,请将Kafka-clients依赖项添加到项目中,下面提供了maven的内容:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-cp1</version>
</dependency>
像其他Kafka 客户端一样。消费者是用一个Properties文件构建。
在下面的示例中,我们提供了使用组所需的最少配置。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
与旧的消费者和生产者一样,我们需要为消费者配置一个初始化的代理列表,以便可以发现集群中的其余部分。这不需要集群中的所有服务器--客户端将确定来自此列表中的代理的完整集合的活动代理。我们在这里假设代理正在localhost上运行。消费者还需要被告之如何反序列化消息键和值。最后,要加入消费者组,我们需要配置组ID。我们继续本教程,来介绍更多配置。
主题订阅
要开始消费,你不行首先订阅应用程序需要读取的主题。在下面的实例中,我们订阅了主题“foo”和“bar”。
consumer.subscribe(Arrays.asList(“foo”, “bar”));
在你订阅后,消费者可以与组其他部分进行协调以获取其分区分配。当你开始使用数据时,将自动处理这一点。稍后我们将演示如何使用特定的API手动分配分区,但请记住,不可混合手动和自动分配。
订阅方法不是增量:你必须包括使用的主题的完整列表。你可以随时更改你订阅的主题集,当你订阅时,任何先前的订阅主题将会被新列表取代。
基本轮询循环
消费者需要能够并行的获取数据,可能会获取多个主题的多个分区,这过程中会跨越多个代理。
为此,它是用类似Unix的轮询或Select调用的API样式:一旦注册了主题,所有未来的协调,重新平衡和数据提取都通过在这个时间的循环中的单个轮询调用来驱动。这个实现简单有效,可以处理一个线程的所有IO。
订阅主题后,你需要启动一二循环来获取分配的分区并开始获取数据。听起来很复杂,但是你需要做的是在循环中调用poll消费者负责处理其他的。每次调用poll都会从分配的分区获取一批数据(可能为空)。下面的例子显示一个基本的轮询循环,它在获取记录到达时候打印偏移量和它的值。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
poll API根据当前offset来获取记录。当手册创建消费组的时候,根据重置策略(通常设置为每个分区的最早或最近偏移量)来设置位置。
一旦消费者开始提交offset,则每个稍后重新平衡的位置重置为最后提交的offset。
传递给poll的参数控制消费者在等待当前位置的记录时,阻塞的最大时间。一旦任何记录可用,消费者立刻返回,但是如果没有可用的记录,它在返回之前等待设置的超时时间超时。
消费者被设计在自己的线程里面运行,在多线程中如果没有做同步,它们是不安全的。在这个例子中,我们使用了一个标示,当应用程序关闭的时候,它可以用来从轮询循环中断。当这个标示被另一个线程设置为false(比如是关闭进程)时候,只要轮询返回并且应用程序处理完记录,循环将中断。
完成后,应该始终关闭消费者。不仅仅是清理在用的socket,它还确保消费者可以警告协调器(coordiator)其离开了组。
这个例子中使用了相对较小的超时来确保当消费者被关闭时候没有太大的延迟。或者,你可以用一个比较长的延迟并通过唤醒API来中断这个循环。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + “: ” + record.value());
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
以上,我们已经更改超时时间为Long.MAX_VALUE,这基本意味着消费者将无限阻塞,直到可以返回下一个记录。触发关闭线程可以通过调用consumer.wakeup()来中断活动的轮询,而不是像上一个例子中一样设置一个标示。这个API可以安全的从另一线程调用。
注意:如果没有正在进行的活动轮询,则将从下一个调用中抛出异常。这个例子中我们将异常捕获了。
把它们放在一起
在下一个例子中,我们将把所有的这些组合在一起构建一个简单的Runnable任务,它初始化消费者,订阅主题列表,并无限期的执行轮询循环,直到外部关闭。
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(“group.id”, groupId);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
为了测试这个例子,你需要一个运行0.9.0.0版本的kafka,和一个要消费字符串数据的主题。将一串字符串数据写入主题的最简单方法是使用kafka-verifiable-producer.sh脚本。为了更有意思,我们还应该确保主题有多个分区,以便不会让一个消费线程空闲。例如,有一个kafka和zookeeper都在localhost上运行,你可以在kafka的根目录执行:
# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181
# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092
然后我们可以创建一个小型的驱动程序来设置一个拥有三个成员的消费组,所有的成员均订阅我们刚刚创建的相同主题。
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "consumer-tutorial-group"
List<String> topics = Arrays.asList("consumer-tutorial");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
此例子将三个可运行的消费者提交给executor。每个线程都有一个单独的ID,以便可以看出哪个线程在接收数据。当您停止进程时候,将调用cusumor,这将使用唤醒暂停三个线程,并且等它们关闭。如果你运行这个,你应该看到许多线程的数据,这里是一个运行的示例:
2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}
输出显示所有三个分区的消耗。每个分区已经分配给其中一个线程。在每个分区中,您可以看到偏移量在顺序递增。你可以使用Ctrl+C从命令行或通过IDE关闭该进程。
消费者活力
作为消费者做的一部分时,每个消费者被指定为其订阅的主题中分区的子集。这基本上是这些分区上的组锁。只要保持这种锁定,组中的其他成员将不能从它们读取,这样可以避免重复消费。但是如果消费者由于机器或应用程序故障而司机。则需要释放锁,以便可以将分区分配给正常的消费者。
Kafka的组协调协议使用心跳机制来解决这个问题。每次重新平衡后,所有成员开始向群组协调者发送定期心跳。只要协调器继续接受心跳,它就假定成员是健康的。在每个接受到的心跳时候,协调器启动(或重置)定时器。如果定时器到期时候没有收到心跳,则协调器将标记成员死亡,并且向组的其余部分发送信号通知他们应该重新加入,以便可以重新分配分区。定时器的持续时间陈为会话超时,并且在客户端使用session.timeout.ms配置。
props.put(“session.timeout.ms”, “60000”);
会话超时可以确保在应用程序或机器当了或者网络分区从消费者从消费者分离后可以将锁释放出来。但是处理程序失败通常需要一点技巧。因为就算消费者仍然向协调者发送心跳,并不意味着这个程序是健康的。
消费者的轮询循环设计是为了处理这个问题。当您调用poll或其他阻塞API时,所有网络IO都在前台完成。消费者不是用任何后台线程。这意味着当您调用轮询时候,心跳仅发送到协调器。如果应用程序停止轮询(无论是因为处理代码抛出异常还是下游系统奔溃),则不会发送心跳,会话超时将过期,并且组将重新平衡。
唯一的问题是,如果消费者处理消息的时间超过会话超时的时间,则可能会触发虚假的重新平衡。因此,你应该将会话超时设置为足够大,以使这情况不能发生。默认值为30秒,但是其设置为高达几分钟并非不合理。较大会话超时的缺点是协调器需要更长时间才可以检测到真正的用户崩溃。
交付语义
当首次创建消费组时,将根据auto.offset.rest设置定义的策略来设置消费者的初始偏移量。一旦消费者开始处理消息,它根据应用的需要定期地提交偏移。在每次后续重新平衡之后,位置将被设置为组中该分区的最后提交的偏移量。如果消费者在成功处理的消息提交偏移量之前崩溃,则另一个消费者将最终重复该工作。你提交的频率越高,你在崩溃中看到的重复次数越少。
到目前为止的示例中,我们设定启用自动提交策略。当设置enable.auto.commit设置为true(这是默认值)时,消费者将根据使用“auto.commit.interval.ms”配置的时间间隔触发偏移提交。通过减少提交的时间间隔。你可以限制消费者在崩溃的情况下必须重新处理的量。
如果你要使用消费者提交API,首先你要通过 在消费者的配置中设置enable.auto.commit为false来终止自动提交。
props.put(“enable.auto.commit”, “false”);
提交API本身很容易使用,但是重要的一点是如何把它集成到轮询循环中。因此,以下示例包括带有提交详细信息的完整轮询循环。处理提交的最简单的方法是使用同步提交API:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
在使用没有参数的commitSync API提交上次调用poll时候返回的偏移量。此调用将无限期阻塞,直到成功或出现不可恢复的错误。主要的错误是消息处理时间超过会话超时时间。当这种情况发生时,协调器会从组中剔除消费者,这导致抛出CommitFailedException。您的应用程序应该通过尝试回滚上次成功提交的偏移量后消耗消息引起的任何更改来处理此错误。
通常,应该确保消息成功处理后再提交。如果消费者在一个发送提交之前崩溃了,那么消息将会再次被处理。如果提交策略保障了最后提交的偏移量没有超过当前的位置,那至少有一次传递语义。
通过改变提交策略以保证“当前位置”不会超过“最后的提交偏移量”,如上图所示,你将获得“最多一次”。如果消费者在这个位置上赶上最后提交的偏移量之前崩溃,在“当前位置”和“最后提交位置”
的消息将会丢失,但是你可以确信没有消息将被多次处理。为了实现这个策略,我们只需要改变提交和消息处理的策略。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
consumer.commitSync();
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
请注意,使用自动提交可以“至少一次”处理,因为消费者保证只对已返回到应用程序的消息提交偏移量。(译者注:如果这个消费返回到应用程序了,在还没开始处理崩溃了,仍然不能满足至少处理一次)。在最糟糕的情况下,可以需要重新处理消息数量收到应用程序在提交间隔(由auto.commit.interval.ms配置)中可以处理消息条数限制。
但是通过使用提交API,你可以更好地控制你愿意接受多少重复的处理。在极端的情况下,可以在每个消息处理后提交偏移量。示例如下:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
consumer.commitSync(Collections.singletonMap(record.partition(), new OffsetAndMetadata(record.offset() + 1)));
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
在这个例子中,我们在调用commitSync时传递我们想要提交的显示偏移量。已经提交的偏移量始终为应用程序将读取的下一条消息的偏移量。当调用没有参数的commitSync时候,消费者提交返回到应用程序的最后一个偏移量(加一 ),但是我们不能在这里使用,因为它将准许位置领先于我们的实际进度。
显然,对于大多数使用情况,在每个消息处理后提交不是个好主意,因为处理线程必须阻塞从服务器返回的每个提交请求。这将杀死吞吐量。更合理的方法可能是每N个消息后提交,其中N可以被调整以获得更好的性能。
在此示例中,commitSync的参数是从主题分区到OffsetAndMetadata示例的映射。提交API准许你在每次提交时候包含一些额外的元数据。这可以用于记录提交时间,发送它的主机或应用程序需要的任何信息。在这个例子中,我们将它留空。
更合理的策略可能是在完成处理来自每个分区的消息时提交偏移量,而不是对收到的每个消息提交。ConsumerRecords集合提供对其中包含一组分区和每个分区的消息的访问。下面的示例演示了这个策略。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(record.offset() + ": " + record.value());
long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
}
}
} finally {
consumer.close();
}
到目前为止的例子集中在同步提交API,但是消费者也有个异步API,commitAsync。使用异步提交通常会提供更高的吞吐量,因为应用程序可以在提交之前开始处理下一批消息。折中的是,你只能后来发现提交失败。下面示例显示了基本用法:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
// application specific failure handling
}
}
});
}
} finally {
consumer.close();
}
注意我们可以提供一个回调对象给commitAsync,当消费者提交完成后(无论成功还是失败),调用。如果你不需要,你也可以调用没有参数的commitAsync。
消费组检查
当消费组处于活动状态时候,可以通过kafka分发包的bin目录中的consumer-groups.sh脚本从命令行检查分区分配和使用进度。
# bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092**** ****
输出可能如下:
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.1
这显示了消费组中分配的所有分区(消费者实例拥有它)和最后提交的偏移量(此处为“当前偏移量”)。分区的滞后是日志结束偏移和最后提交的偏移之间的差。管理员可以监控此情况,以确保消费组可以跟上生产者。
使用手动分配
如教程开头所述,新消费者不需要消费组的实例,来实现较低级别的访问。这是此API最强的原因之一。较老的“简单”消费者也提供了这个,但它需要你自己做更多错误处理。使用新的消费者,你只需要分配你的读取的分区,然后开始轮询数据。
下面的示例显示如何使用partitionsFor API从主题分配所有的分区。
List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);
类似subscribe,对assign的调用必须通过要读取的分区的完整列表。一旦分区被分配,轮询循环将像以前一样工作。
警告,所有的偏移提交通过组协调器。无论它是简单的消费者还是消费组。因此,如果你需要提交偏移量,那么必须将group.id设置为合理的值。以防止与其他的消费者冲突。如果一个简单的消费者尝试使用与活动消费组匹配的组ID提交偏移量,协调器将拒绝该提交(这将导致CommitFailedException)。但是,如果另一个简单的消费者实例也共享相同的组ID,则不会有任何错误。
结论
新的消费者为kafka社区带来许多好处,包括更干净的API,更好的安全性和减少依赖。本教程介绍了它的基本用法。重点介绍poll语义,并使用commitAPI来控制交付语义。有更多细节还未覆盖,但是足以让你开始。虽然消费者仍然处于活跃状态,我们鼓励你尝试,如果遇到任何问题,可以在邮件列表中告诉我们。