说在前面
写这篇攻略之前,我在想,是不是太小众,有没有必要写。首先 Cloudera 本身只是一种商业化选择,其次数据总线的元数据解决方案不止 Schema Registry 一种(但的确是做的最好的),最后有安全需求的企业本身也少。但是,我还是写下吧,哈哈。
几个概念
Secured Kafka: 经过 Kerberos 安全认证加固的 Kafka 集群,并且使用了自带的 ACL 功能;
Confluent: 一家专门围绕 Kafka 打造数据生态的公司,由 3 名前 LinkedIn 工程师创立,目标是使 Kafka 更易用、更安全、利用 connector 和异构数据简单对接、基于 schema 元数据管理打造数据地图和帮助用户理清数据血缘;(对于大中企业来说,数据血缘关系是非常重要的,可以避免海量数据冗余和理清业务的请体下对数据源进行优先级定义,说白了就是省钱和出故障的时候知道怎么降级和限流,其实还是为了省钱止损)
Tips:Confluent 有自己的 Kafka 发行版,从目前来看和社区、CDH版本兼容性没有问题,意思是说社区的 0.10.x 版本可以正常访问 0.10.x 版本的 Confluent Kafka。特别需要遵循的一个规则是,客户端版本需要小于等于服务端版本,否则会因为并不友好的报错甚至没有报错导致诡异问题,比如无法消费。
Schema Registry:Confluent 公司推出的、使用 Kafka 作为持久化存储的、为 Kafka Topic 服务的数据结构 (Schema) 管理组件(目前支持 Avro Schema),并且可以方便的向前向后兼容历史版本数据。
那么接下来详细说说怎么在已经开启 Kerberos 和 ACL 的 Kafka 集群上整合 Schema Registry;如果有对如何开启 Kerberos 和 ACL 不明白的同学可以参考:Step by Step 实现基于 Cloudera 5.8.2 的企业级安全大数据平台 - Kafka 整合 SASL/Kerberos 认证及 ACL。
那么问题来了,为什么要有 Schema Registry 这个玩意?你还是没搞清的话可以这么看:
你维护的是一个数据流式平台而非物理隔离的一个一个 Kafka 集群,数据源横跨许多团队和部门被使用,比如 A 团队写入的数据,B C D 在用,A 却不知道,B C D 之间也不知道互相用着同一份数据。
- 某天 E 用户告诉我说我要 XXX 数据,需要 YYY 字段,我需要接一份数据进来,你和自信的说,卧槽这个数据我们已经有了,你不要重复接入了,直接消费 ZZZ topic 就好了;所以 Schema Registry 可以极大减轻数据冗余,节约机器和运维成本;
- 如何告诉 B C D 数据格式是啥?要 A 一个一个告诉? 不可能;所以 Schema Registry 可以极大减轻沟通成本;
- 万一哪天 A 对数据格式做了变更,B C D 的作业是不是要极大程度歇菜?所以 Schema Registry 的主动数据兼容可以极大减轻下游变更量和增加系统健壮性;
- 某天 A 的数据流出现了问题,挂了, 你怎么知道是 B C D 受到了影响?所以 Schema Registry 可以把数据血缘告诉你帮助你排障和有条理的组织故障恢复;
- 某天你作为运维想搞点 KPI,想告诉老板我们数据平台承载了多少业务,哪些数据源很核心,因为下游接了什么业务,业务的实际产出是什么?
这样想想,想要数据化、精细化、服务化去运维数据流式平台,是不是需要它?
环境简述
OS: CentOS 7.2
JDK: Oracle 1.8.0_111
CDH: 5.8.2
Kafka: 2.1.0-1.2.1.0.p0.115(对应社区 0.10.0.0)
Kerberos: 5
操作用户: admin
操作路径(pwd):/home/admin
主机角色分布:
- KDC: 172.16.134.8
- Zookeeper Server / Kafka Broker: 172.16.134.2
- Kafka Client: 172.16.129.161
- Schema Registry: 172.16.134.8
开始整合
为了使得整合成功,我们需要做几件事:配置 Schema Registry 使其可以访问 Secured Kafka,保证 Schema Registry 可以有权限新建、写入及访问 Kafka 相关 topic(本文为 _schemas
)。和上一篇一样,我们不启用 TLS/SSL 加密传输,因为吞吐开销还是蛮大的,常识告诉我是 20%。
为了帮助大家理解,以现实场景进行举例,你开了一家银行,你的银行需要足够安全:
- 你的员工在进入银行上班前,必须发放工牌刷牌进入;(Kerberos 认证)
- 每个不同职位的员工的工位不一样,具有不同的职能权限;(ACL)
- 当客户需要办理重要业务的时候,需要窃窃私语,以防他人偷听;(TLS/SSL加密)
这里的银行就是 Kafka,我们的员工是 Schema Registry,客户是 Kafka Clients:
提一下,下文不会阐述如何进行 TLS/SSL 加密。
创建 principal
kadmin
kadmin > addprinc -randkey schema_registry/kafka_user@LWSITE.NET
kadmin > ktadd -k ${CONFLUENT_HOME}/etc/schema-registry/schema_registry_kafka_user.keytab schema_registry/kafka_user@LWSITE.NET
${CONFLUENT_HOME}
是 Confluent 安装包的根目录。
创建客户端访问用的 jass 配置文件
vim ${CONFLUENT_HOME}/etc/schema-registry/jaas_kafka_client.conf
配置如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=false
storeKey=true
principal="schema_registry/kafka_user@LWSITE.NET"
keyTab="${CONFLUENT_HOME}/etc/schema-registry/schema_registry_kafka_user.keytab";
};
修改 Schema Registry 配置文件
vim ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties
配置如下:
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=SASL_PLAINTEXT://${KAFA_BROKER}:9092
kafkastore.connection.url=${ZK_SERVER}:2181/${CHROOT}
kafkastore.sasl.kerberos.service.name=kafka # 这里千万不要配置错误
kafkastore.group.id=schema-registry-8081
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.topic=_schemas
zookeeper.set.acl=false
schema.registry.zk.namespace=${CHROOT}/schema-registry
debug=true
${KAFA_BROKER}
是 Kafka Broker 机器,如果是多台机器,以逗号分隔;
${ZK_SERVER}
是 ZK 主机,如果是多台机器,以逗号分隔;
${CHROOT}
是 Kafka 在 ZK 的根节点;
ACL 授权使得 Schema Registry 可以访问 _schemas
export KAFKA_OPTS="-Djava.security.auth.login.config=${KAFKA_CLIENT_HOME}/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
kafka-acls --add --allow-host ${SCHEMA_REGISTRY_HOSTNAME} --allow-principal User:schema_registry --producer --topic _schemas --group schema-registry-8081 --authorizer-properties zookeeper.connect=${ZK_SERVER}:2181/${CHROOT}
kafka-acls --add --allow-host ${SCHEMA_REGISTRY_HOSTNAME} --allow-principal User:schema_registry --consumer --topic _schemas --group schema-registry-8081 --authorizer-properties zookeeper.connect=${ZK_SERVER}:2181/${CHROOT}
${KAFKA_CLIENT_HOME}
是 Kafka 客户端根目录;
${SCHEMA_REGISTRY_HOSTNAME}
是 Schema Registry 根目录,一般是 ${CONFLUENT_HOME}/etc/schema-registry
;
其中配置文件 jass.conf 应当如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
principal="kafka/kafka_admin@LWSITE.NET"
keyTab="/tmp/kafka_kafka_admin.keytab";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
principal="kafka/kafka_admin@LWSITE.NET"
keyTab="/tmp/kafka_kafka_admin.keytab";
};
对于 ACL 授权如果有不清楚的地方,还是参考:Step by Step 实现基于 Cloudera 5.8.2 的企业级安全大数据平台 - Kafka 整合 SASL/Kerberos 认证及 ACL。
重启 Schema Registry
开启全兼容模式:
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" http://${SCHEMA_REGISTRY_HOSTNAME}:8081/config --data '{"compatibility":"FULL_TRANSITIVE"}'
export KAFKA_OPTS="-Djava.security.auth.login.config=${CONFLUENT_HOME}/etc/schema-registry/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
${CONFLUENT_HOME}/bin/schema-registry-start -daemon ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties >> /home/admin/logs/confluent/schema-registry.log
Q&A
- ACL 授权的时候 遇到
NoAuth for /kafka-acl/Topic
解法:使用 kinit kafka 超级用户的方式进行授权,而不是文中的使用 keytab 的方式;