kafka 协议分析 (一) 基础篇
kafka 协议分析 (二) Produce API
kafka 协议分析 (三) Fetch API
上一篇文章介绍了如何生产消息,本篇将介绍一下如何从Kafka消费消息
一个最简单的消息消费流程如下图:
- 先检查服务器支持的API版本,并从中选择一个合适的使用。ApiVersions API
- 获取目标Topic的MetaData,主要包括Topic的partition数量和leader地址。Metadata API
- 获取目标Topic的Offset情况(第一个offset和最后一个offset在哪儿)ListOffsets API
- 从kafka获取消息Fetch API
其中第1、2步已经在上一篇文章详细介绍过。
ListOffsets API (Key: 2)
ListOffsets API用来获取Topic当前的offset情况。按照kafka的设计,拉取消息(Fetch API)需要提供拉取消息的目标位置(offset)。所以真正拉取消息之前,需要事先知道topic的最后一个offset和第一个offset是什么。(kafka的消息有超时时间,默认一周消息就会被清理,这意味着topic中现存消费的offset不一定从0开始)
域 | 值 | 描述 |
---|---|---|
Size | 40 | 消息长度 |
Api Key | 1 | Api |
Api Version | 0 | Api版本 |
Correlation ID | 1 | 请求ID,服务器会将这个ID原样返回 |
Replica Id | -1 | follower的broker id,通常设置-1 |
Topic Len | 1 | Topic个数 |
Topic | w1 | Topic名 |
Partition Len | 1 | Partiton个数 |
Partition | 0 | Partition号 |
Timestamp | -2 | 时间戳 (-1最后一个;-2第一个) |
Fetch API (Key: 1):
Fetch API用于从Topic中拉取数据,它需要提供一个起始offset
域 | 值 | 描述 |
---|---|---|
Size | 40 | 消息长度 |
Api Key | 1 | Api |
Api Version | 0 | Api版本 |
Correlation ID | 1 | 请求ID,服务器会将这个ID原样返回 |
Replica Id | -1 | follower的broker id,通常设置-1 |
Max wait Time | 5000 | 请求超时时间毫秒 |
Min bytes | 1 | 最小返回大小 |
Max bytes | 2^30 | 最大返回大小 |
Topic Len | 1 | Topic个数 |
Topic | w1 | Topic名 |
Partition Len | 1 | Partiton个数 |
Partition | 0 | Partition号 |
Offset | 0 | 时间戳 起始offset |