1.kafka消费组基本概念
kafka消费topic是以group为单位来的,一个group消费一个topic。一个group能容纳多个consumer。consumer消费是以分区(partition)来的,一个consumer可以消费一个或多个partition,一个partition只能被一个consumer消费。(如果一个consumer group中的consumer个数多于topic中的partition的个数,多出来的consumer会闲置(idle),所以如果为了增加消费者能力,只简单增加消费者数量不一定会有用).
消费与分区对应关系
消费者数量小于partition的数量
消费者数量小于partition的数量
消费者数量小于partition的数量
2. consumer group的分区再平衡
每个consumer负责自己对应的分区,但是当group中有consumer退出或者新加入consumer,再或者topic中新增partition,group中的消费者负责的partition都得重新计算,Rebalance 期间consumer不能再消费消息,做rebalance的时候是会影响整个consumer group。
consumer获知自己消费的分区以及group内其他成员信息都是通过向一个叫做Group Coordinator的broker发送心跳来的,不同的group的broker可能不同。只要consumer再给Coordinator发送心跳,就被认为是正常的。触发心跳是通过consumer客户端轮询处理消息来的。如果consumer长时间没有心跳group coordinator就会认为consumer已经挂了,触发rebalance,新版本的java api(kafka_2.11的0.10.2.0已经支持了)支持显示的关闭客户端,这样可以避免有group coordinator因为超时来触发rebalance有此导致消息积压。
分区分配流程:
1.第一个加入group的consumer是consumer的leader(这个consumer奔溃之后会怎么样暂时不清楚)
2.新加入的consumer向group coordinator发送加入请求
3.leader从group coordinator接收消费者列表,然后给每个consumer分配分区
4.leader将重新分配的信息发送给group coordinator,group coordinator再将信息发送给所有的consumer
3.启动一个consumer
使用java api只需要配置 bootstrap.servers, key.deserializer, value.deserializer三个配置就可以。一般还要带上group.id,指定所属的消费组。
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "testTopicGroup1");
new KafkaConsumer<String, String>(properties);
4.订阅topic
consumer.subscribe(Collections.singletonList("testTopic"));
可以指定多个topic,可以使用正则表达式:
consumer.subscribe("test.*");
demo:
private volatile boolean shutdown = false;
public void poll(){
Properties properties = new Properties();
properties.put("bootstrap.servers", "120.27.8.221:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "testGroup");
KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
//关闭轮询
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
@Override
public void run() {
shutdown = true;
}
}));
try{
while (!shutdown){
//开始轮询消息,poll会找到group coordinator,加入consumer group,确认消费的分区,获取消息
//poll会获取本地最大的offset之后的消息,而不是commit到kafka中的offset
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord record:records){
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset()
+ ", customer = " + record.key()
+ ", country = " + record.value());
}
}
}finally {
//及时关闭消费者
consumer.close();
}
}
5.commit offset
不管什么时候调用poll方法都会获取到还未被消费过的消息,这个实现通过消息的offset来实现的,每个分区的offset的管理是通过consumer自己向一个特殊的topic(__consumer_offsets)提交消息来实现的.
1.自动提交
开启自动提交之后,在每次调用poll获取消息的时候会检查时间查看是否需要提交offset,如果已经到时间之后会提交offset,自动提交的好处是方便,劣势是不能灵活控制,如果间隔期间consumer奔溃,已经处理且未提交的消息会被处理两遍。
自动提交配置:
enable.auto.commit=true ##开启自动提交,默认5s提交一次
auto.commit.interval.ms=1000 ##设置自动提交时间间隔