Kafka 简述
本节仅会以扫盲式的模式简述 kafka 的设计, 不会深入分析细节, 如果你已经对 kafka 有所了解, 建议跳至下一节 Kafka Connect. 如果你想详细了解 Kafka 的设计, 推荐: Kafka设计解析(一) - Kafka背景及架构介绍, 本文引用了其中图片; 以及 Kafka 设计提案 (KIP).
上一节提到, Kafka 早期定位一个高吞吐的消息队列, 而如今已经发展成一个分布式的流处理平台. Kafka 最底层的设计十分简单可靠, 并在其基础上扩展了相当多的特性. 这里我们主要使用 Kafka 作为数据传输的管道, 重点关注 Kafka 的可靠性和吞吐量.
基本概念
简单说, Kafka 中分为 3 中角色, Producer, Broker, Consumer, 如果按照 C / S 模型来理解, Producer & Consumer 即 Client, Broker 即 Server. 早期的 Kafka 版本中, Consumer 还依赖于 Zookeeper, 新版的 Consumer 直接与 Broker 通信, 不再依赖 Zookeeper.
Kafka 的数据模型基本沿用了 MQ 的通用概念, 将 Topic 作为数据传输队列, 但加入了 Partition 的概念; 用 Record 表示单条消息, 每个 Record 包含 Key 和 Value, 并且在 Header 中携带 Offset, Timestamp 等信息.
可靠性
作为数据存储和数据传输系统, 可靠性是至关重要的, Kafka 的可靠性主要通过多实例 & 多副本机制实现.
首先, 多个 Broker 采用多实例主从机制, 集群中的 Brokers 会向 Zookeeper 同一路径抢注一个临时节点, 注册成功的即为 Leader, 其余为 Follower.
其次, 对于某一 Topic, 可以配置 Replica 数目, 与 Broker 类似, Topic 中的每个 Partition 也会选出一个 Leader, 实际的读写都在 Leader 节点上进行, Replica 仅仅负责同步, 若同步延迟低于预设值, 称为 ISR (In Sync Replica), 当 Leader 挂掉后, 即可从 ISR 中选出一个新的 Leader, 以保证消息的准确性和服务的高可用性.
这里要说明一点, Kafka 的消息过期策略与该消息是否被消费过无关, 这点与常规 MQ 的设计不同; 在这点上, Kafka 更像一个时序数据库, 可以配置基于时间和基于大小的过期策略, Kafka 会定时清理达到阈值的消息以释放空间; 消息清理策略除了直接删除之外, 还支持 compact 策略, compact 策略是指对于相同 key 的 message, 只保留最新的一份.
吞吐量
首先, 从 Producer 角度来看, Kafka 在磁盘中对 message 的存储采用 append 模式, 这种顺序写入对机械硬盘物理结构十分友好, 能获得极高的写入速度; 从 Consumer 角度来看, 顺序的读取对机械硬盘也极其友好; 额外的, 如果 message 没有加密, Kafka 还支持零拷贝机制, 避免的内核空间到用户空间的数据拷贝;
其次, Kafka 将 Topic 做了水平切分, 将其切分为更细粒度的 Partition, 分散在多台 Broker 上以达到水平扩张的能力, 但相应的, 这种设计舍弃了 Topic 级别的消息严格顺序性, 只能做到 Partition 级别的消息严格顺序性.
Producer
对于 Producer 来说, 在往一个 Topic 投递消息的时候, 可以通过对 message 进行 hash(key) % n 来实现分区的负载均衡, 并且可以保证相同 key 的 message 被投递到相同的 Partition, 以保证严格的顺序性;
此外, Producer 还可以通过配置 ack 策略来实现可靠性和吞吐量的权衡:
- 0 Producer 向 partition leader 送出 message 即认为投递成功, 吞吐量最好, 可靠性最差;
- 1 Partition leader 收到 message 并在本地落盘成功, 向 producer 确认消息投递成功;
- -1 Partition leader 收到 message 并在本地落盘成功, 并且至少有
min.insync.replicas
个 replicas 也收到 message 并在本地落盘成功后, 才向 producer 确认消息投递成功, 可靠性最佳;
Consumer
Kafka 通过 consumer group 概念来实现 Consumer 逻辑上的分组和隔离; 不同 group id 的 consumer 是互不影响的, 这种设计使得可以非常简单的通过新建一个 consumer group 来增加一组订阅者, 而不会影响现有的消费者.
在每个 consumer group 内, 每个 partition 至多只能被一个 consumer 订阅, 即 partition 与 consumer 是多对一的关系, 那就意味着, 如果 partition 数量 < consumer 数量, 将会有 consumer 闲置.
在确定 partition 与 consumer 的绑定关系时, 即 assign 策略, kafka 内置了 3 种算法, RangeAssignor
, RoundRobinAssignor
, StickyAssignor
, 不过多详述.
当 partition 或 consumer 数量发生变化时, 目前版本的 kafka 使用了一种简单粗暴的 rebalance 方案, 即对于每个 consumer group, 任何 partition 或 consumer 的变化都会触发所有 consumer 的 stop, start, assign 过程, 这种策略对于滚动发布或者网络抖动十分不友好. 现在有一个设计提案 KIP-429 正在对此问题进行改进. Kafka Connect 的调度也存在类似的 rebalance 问题, 下文会提到.
Kafka 使用 offset 来表示 consumer 的消费进度, 在一个 consumer group 内, 很显然每个 partition 的 offset 是各自独立的, 用户对 offset 有完全的掌控权, 可以选择使用 high level consumer API, 将 offset 托管给 broker 维护 (早期版本是放在 zookeeper 中, 新版本是存储在 __consumer_offset topic 中), 也可以选择手动维护 offset, 将其存储在外部存储 (如 redis) 中, 每次 poll 时显式指定 offset 参数. 此外, 自动模式下, 还可以选择 auto commit 或 manual commit, 灵活的组合可以实现至少一次, 至多一次, 严格一次 (需配合其他特性, 下文会提到) 消费语义.
基于 partition 与 consumer 多对一关系的设计, 使得 consumer commit 机制在实现上比较容易, 不做过多详述.
Kafka Connect
上一节简述了 kafka 最基本的设计 (对应下图中的 core), 基于这样一个简单可靠的底层系统, conflent 公司以及社区构建了大量优秀组件, 营造了一个繁荣的生态.
下图中是 Confluent 公司以 Kafka 为核心的发行版, 本数据同步系统即使用了图中 Connect API 和 Connectors 组件.
简单的说, Kafka Connect 基于 kafka client 的 Producer API 和 Consumer API 进行封装, 抽象出了 Source Plugin 和 Sink Plugin 编程接口, 并保证了 Exactly Once At lease Once 传输语义, 大大简化了数据传输系统的开发成本. Source Plugin 和 Sink Plugin 在运行时会被实例化成 Connector 和 Task 并由 Kafka Connect 集群调度.
注: Kafka Connect 支持单机模式和集群模式, 并且 debezium 项目中还有一个嵌入式版本的实现, 本文只讨论集群模式.
Exactly Once
这是一个非常严谨的话题, 本节依然只是简述设计, 如果你想深入了解设计和实现细节, 推荐查阅 Kafka 设计提案: Kafka Exactly Once Semantics, 以及这篇博客文章: Kafka设计解析(八) - Exactly Once语义与事务机制原理.
首先解释一下 Exactly Once 语义, 是指一组对 Kafka Partitions 的读写操作, 要么全部成功, 要么全部失败, 并且每个操作严格只执行一次. 也就是说, 如果操作涉及了第三方系统, 比如从 Kafka 消费数据落入 MySQL, 这已经超出了 Kafka Exactly Once 语义涉及的范畴, 需要额外的工作 (比如 Kafka 至少一次消费语义 + MySQL 幂等写入) 来保证系统整体的正确性.
明确了 Exactly Once 语义, 可以将其按照如下三个场景分别讨论:
- 单个 Producer 的写入操作
// todo - 单个 Consumer 的消费操作
// todo - 多个操作, 可能既有 Consumer 的消费又有 Producer 的写入操作
/// todo
Source Plugin & Sink Plugin
基础聊完了, 现在我们来看一看 Kafka Connect 到底给我们抽象出来了什么样的编程接口. 这里我只列举出了个别的 API, 省略了很多生命周期相关的 API, 如果你想详细了解, 可以参考 kafka 源码, 其中还包括一个简易的 FileConnector
的实现.
首先, Kafka Connect 抽象出来了两种调度对象: Connector
和 Task
. 从 API 定义上很容易看出, 由 Connector
负责初始化 Task
, 一个 Connector
可以生成多个相同类型的 Task
.
public abstract class Connector implements Versioned {
...
public abstract Class<? extends Task> taskClass();
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
...
}
public abstract class SourceConnector extends Connector {
...
}
public abstract class SinkConnector extends Connector {
...
}
显而易见, Task 接口就是一个生产者消费者接口, 不过这里的 API 名字要以 Kafka Connector 集群的视角看待, 从 Source 端 poll
Record 送入 Kafka, 从 Kafka 读取 Record put
到 Sink 端 . 当然, 为了效率, 这里的接口是批传输的.
public interface Task {
...
void start(Map<String, String> props);
...
}
public abstract class SourceTask implements Task {
...
public abstract List<SourceRecord> poll() throws InterruptedException;
...
}
public abstract class SinkTask implements Task {
...
public abstract void put(Collection<SinkRecord> records);
...
}
再来关注一下 Record 携带了哪些信息. 这里说明一下, keySchema
和 valueSchema
是可选的 , key
和 value
的具体序列化策略也可以自定义实现, 常用的有支持 Jackson 和 Avro 方式.
从我的使用经验看, 如果是在系统验证环节, 或者业务上吞吐量不是瓶颈时, 建议使用 Jackson 序列化模式, 并开启 schema
, 这会大大简化问题的排查难度. 若比较侧重于吞吐量, 可以选择使用 Avro, 并将 schema
注册到 Schema Registry (你可以在上文的 Confluent Platform 图中找到该组件). 若追求更极致的性能, 可以自定义实现序列化器, 比如使用 protobuf 等.
timestamp
标识了该条消息的时间戳, 可以是消息本身产生的时间, 也可以是消息投递到 kafka 的时间. 这里推荐使用消息投递到 kafka 的时间, 将消息本身产生的时间作为消息中的一个字段存储, 从而更完整的标识消息在各系统中的传输延迟.
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
private final String topic;
private final Integer kafkaPartition;
private final Schema keySchema;
private final Object key;
private final Schema valueSchema;
private final Object value;
private final Long timestamp;
private final Headers headers;
...
}
public class SourceRecord extends ConnectRecord<SourceRecord> {
private final Map<String, ?> sourcePartition;
private final Map<String, ?> sourceOffset;
...
}
public class SinkRecord extends ConnectRecord<SinkRecord> {
private final long kafkaOffset;
private final TimestampType timestampType;
...
}
下面考虑一个问题, 如果 Task 重启了, 怎么恢复之前的状态?
对于 Source Task 来说, 需要额外的状态存储, 即 SourceRecord 中的 sourcePartition
和 sourceOffset
字段, 注意, 这两个字段与 kafka 的 partition 和 offset 观念不是一码事儿, 只是取了相似的名字, 由 Source Connector 的实现者自行决定存储逻辑, Kafka Connect 会确保持久化的记录 Source Task 的状态信息, 以确保在重启后 Source Task 能够恢复状态.
对于 Sink Task 来说, 只要使用由 broker 维护的 consumer 的 offset 信息就可以了.
除此之外, Kafka Connect 还要存储每个 Connector 的配置信息, 以及状态信息 (比如 Task 失败后的异常堆栈等), 这些信息存在哪里? 自然是存在 Kafka 本身的 Topic 最可靠, 所以 Kafka Connect 会要求指定三个 Topic connect-configs
, connect-offsets
, connect-status
分别保存这些信息.
可以看到, Kafka Connect 通过封装向我们提供了一个极其简易的编程接口, 使我们无须过多的关注消息的丢失和重试机制, Kafka Connect 会确保 List<SourceRecord> poll()
生产的数据被严格一次的送往 kafka, 也会保证严格一次的通过 void put(Collection<SinkRecord> records)
向消费者投递. 也即实现了 Kafka 层面的 exactly once 传输语义.
当然, 如前所述, 两侧的系统属于外部系统, 逻辑的正确性要靠开发者自行设计保证. 如果两个接口抛出异常, 则会导致 Task 失败, 需要排查原因后手动重启 Task.
Rebalance
前面在介绍 Consumer Group 时已经提到了 rebalance, Kafka Connect Cluster 也有类似的机制. 每当往 cluster 提交一个新的 Connector / Task, 或者更新一个 Connector / Task, 删除一个 Connector / Task, 都会触发 Cluster 的 rebalance, 进而导致所有 Connector / Task 的重新分配和重启, 如果 Task 的状态恢复代价比较大 (其实 debezium 恢复代价就很大, 后续的篇章中会提到), 那么在滚动发布, 任务提交, 节点扩张或收缩时, 是极不友好的.
幸运的是, Kafka 2.3 已经实现了一个渐进式的 Rebalance 机制, 用于解决这些问题, 可参考 KIP-415.
这里顺便提一下, 既然提到 Kafka Connect 集群模式, 以及 Connector / Task 的分配, 必然涉及到 worker 节点的协调机制. Kafka Connect 已经将所有信息持久化到 Kakfa Topic 中, worker 节点本身已经无状态了, 然后 Kafka Connect 会从所有节点中选出一个 Leader, 负责 Connector / Task 的分配, 这个选举协议是由 Kafka Connect 直接实现的, 并未引入类似 Zookeeper 这种专门的协调系统.
Restful 管理接口
Restful 接口似乎已经成为了一种时尚, 目前只要提到 http 接口或者 web 接口, 几乎都会想到 restful. 但是, 就我的使用体验而言, restful 强调对资源的语义化操作, 强行照搬 restful 的规范来设计业务系统的接口总感觉有一种被 👀光的感觉, 总担心会暴露出内部的敏感信息; 而对于中间件或者说这种基础技术设施的管理接口而言, restful 则显得十分自然.
GET /connectors
POST /connectors
GET /connectors/{name}
GET /connectors/{name}/config
PUT /connectors/{name}/config
PUT /connectors/{name}/pause
POST /connectors/{name}/restart
POST /connectors/{name}/tasks/{taskId}/restart
DELETE /connectors/{name}
Kafka Connect 主要的管理接口即对 Connector / Task 的增删改查, 这里我截取了部分 restful 接口, 即便没有说明, 语义也已经十分明确了. 完整的接口列表请查阅 官方文档.
OK, 能看到这里, 相信你大概已经知道 Kafka 和 Kafka Connect 是什么东西了, 然而很遗憾, 我们使用的 CDH 发行版并不支持 Kafka Connect, 并且我们需要对开源版的 Kafka Connect 做一些功能增强和 bug 修复, 下一篇就是实操环节了, 将展示如何独立部署 Kafka Connect Cluster.