一、otter介绍
阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,同时为了提升用户体验,整个机房的架构为双A,两边均可写,由此诞生了otter这样一个产品。
otter第一版本可追溯到04~05年,此次外部开源的版本为第4版,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了otte4。
目前同步规模:
- 同步数据量6亿
- 文件同步1.5TB(2000w张图片)
- 涉及200+个数据库实例之间的同步
- 80+台机器的集群规模
Otter项目地址:https://github.com/alibaba/otter
Otter文档地址:https://github.com/alibaba/otter/wiki
二、基础概念
Pipeline:从源端到目标端的整个过程描述,主要由一些同步映射过程组成;
Channel:同步通道,单向同步中一个Pipeline组成,在双向同步中有两个Pipeline组成;
DataMediaPair:根据业务表定义映射关系,比如源表和目标表,字段映射,字段组等;
DataMedia: 抽象的数据介质概念,可以理解为数据表/mq队列定义;
DataMediaSource: 抽象的数据介质源信息,补充描述DateMedia;
ColumnPair: 定义字段映射关系;
ColumnGroup: 定义字段映射组;
Node: 处理同步过程的工作节点,对应一个jvm;
基础概念直接的关系如图:
三、架构设计
下图是关于Otter运行原理图:
根据上图里面关键几个元素进行介绍
- db : 数据源以及需要同步到的库
- Canal : 用户获取数据库增量日志
- manager : 配置同步规则设置数据源同步源等
- zookeeper : 协调node进行协调工作
- node : 负责任务处理,即根据任务配置对数据源进行解析并同步到目标数据库的操作。
原理描述:
基于Canal开源产品,获取数据库增量日志数据。
a. 开源链接地址:http://github.com/alibaba/canal
b. 推荐一个讲Canal源码的博客:http://www.tianshouzhi.com/api/tutorials/canal/380-
典型管理系统架构,manager(web管理)+node(工作节点)
a. manager运行时推送同步配置到node节点
b. node节点将同步状态反馈到manager上
基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作.
流程:
- 定义数据源
- 定义数据介质
- 建立映射规则
- 建立字段映射
- 建立字段组
四、代码结构
工程结构:
包含三部分:Share | Node | Manager。 其中Share是Node和Manager共享的子系统,并不是独立部署的节点。Node和Manager是独立部署的。
Node:一个独立部署的节点,比如两个机房需要做通讯,则每个机房至少要部署一个Node节点(不考虑HA的话),数据同步的过程实际上都发生在Node之间
Manager:管理的节点,逻辑上只有一个(一个Manager管理多个Node节点),如果不考虑HA的话。负责管理同步的数据定义,包括数据源、Channel、PipeLine、数据映射等,各个Node节点从Manager处获取并执行这些信息。另外还有监控等信息。
Manger各个子系统说明:
- biz 对系统数据加载(即初始化时候执行SQL初始化的系统表数据)
- deployer 本地启动
- web 配置管理的webUI
Node各个子系统的说明:
- Common:公共内容定义
- Canal: Canal的封装,Otter采用的是Embed的方式引入Canal(Canal有Embed和独立运行两种模式)
- Deployer:内置Jetty的启动
- etl: S.E.T.L 调度、处理的实现,是Otter最复杂、也是最核心的部分。(Select、Extract、Transform、Load)
SETL过程功能说明如图:
Share各个子系统的说明:
- Common: 公共内容定义
- Arbitrate: 用于Manager与Node之间、Node与Node之间的调度、S.E.T.L几个过程的调度等;
- Communication 数据传输的底层,上层的Pipe、一些调度等都是依赖于Communication的, 简单点说它负责点对点的Event发送和接收
- etl:实际上并不负责ETL的具体实现,只是一些接口&数据结构的定义而已,具体的实现在Node里面。
五、基本操作
这部分操作参考官网:https://github.com/alibaba/otter/wiki
1. Manger和Node的配置和部署
2. 配置任务
六、本地调试环境搭建
1. 代码下载和导入idea
环境搭建:
- 进入$otter_home目录
- 执行:mvn clean install
- 导入maven项目。如果eclipse下报"Missing artifact com.oracle:ojdbc14:jar:10.2.0.3.0",修改{user.dir}/lib/ojdbc14-10.2.0.3.0.jar"为绝对路径,比如"d:/lib/ojdbc14-10.2.0.3.0.jar"
导入idea之后的结构如图:
本地调试环境搭建:
-
- 配置Manager
- 1.1 打开Manger下的deployer模块下的otter.properties文件
-
1.2 修改如下属性:
- 1.3 运行OtterManagerLauncher.java 启动Manager
- 1.4 访问http://localhost:8080,配置集群
- 1.4 访问http://localhost:8080,配置Node,ip为本机ip
-
配置Node
2.1 打开node下的deployer模块下的otter.properties文件
-
2.2 修改属性,这个node要连接去那个mananger,我们是本机调试写本机Ip,或者127.0.0.1
-
2.3 运行OtterLauncher.java时候设置运行参数-Dnid=id,这个id为manager,web界面中配置node对应的id。
如图:
2.4 运行OtterLauncher.java
2.5 启动完成之后可以在manager的web界面看到node已经启动
-
启动
- 3.1 配置同步任务
- 3.2 启动同步任务,修改数据源的表,可以查看输出端的表是否有数据同步过去了。
七、改造支持kafka的代码介绍
修改分为三大部分:
1. manager端的改造
- 修改页面 addDataSource.vm 添加 Kafka 选项;
- dbCheck.js 引用了 Hello.js,这里应用了dwr技术,通过js去调用后端java代码。
配置文件manager中的biz模块下的otter-manager-service.xml文件:
<bean id="dataSourceChecker" class="com.alibaba.otter.manager.biz.utils.DataSourceChecker">
<property name="dataMediaSourceService">
<ref bean="dataMediaSourceService" />
</property>
<property name="dataSourceCreator">
<ref bean="dataSourceCreator" />
</property>
<dwr:remote javascript="Hello">
<dwr:include method="check" />
<dwr:include method="checkMap" />
<dwr:include method="checkNamespaceTables" />
</dwr:remote>
</bean>
dbCheck.js调用DataSourceChecker.java中的方法去校验数据库是否可用,增加kafka的校验方法。
dwr简单教程
editDataSource.vm页面类型选项添加 Kafka 类型
DataSourceList.java中增加kafka数据源处理
addDataMedia.vm页面中的配置kafka的主题时如何验证主题是否存在?
addDataMedia.vm页面中的kafka的主题无法验证是否存在
-
addColumnPair.vm中如果存在kafka的数据源时候,列直接复制mysql表的列。
数据来源:AddColumnPairGroup.java中的execute方法调用buildColumnPairFromDataMedia方法。
在kafka中没有列的概念,要把数据源的列直接赋值给kafka,这一步操作需要在几个类中进行:
com.alibaba.otter.manager.web.home.module.screen.AddColumnPair
com.alibaba.otter.manager.web.home.module.screen.AddColumnPairGroup
- ColumnPairAction.java 中的doSave方法中kafka没有所谓的列名,这里要处理一下进行保存。
- 添加 Kafka 数据源类型
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaDataMedia.java
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaMediaSource.java
到这里,前端的改造基本完成。可以通过manager的web界面新建出kafka的数据源。
2. node端的改造 selector部分
- com.alibaba.otter.node.etl.select.selector.MessageParser类负责解析数据对象解析,将对应canal送出来的Entry对象解析为otter使用的内部对象。这里对配置的数据源做了一个检查:源和目标的库名表名是否是一致。这里需要排除kafka数据源做判断。
3. node端的改造 transform部分
- node中的T部分工作的代码在com.alibaba.otter.node.etl.transform包下。
- 新增com.alibaba.otter.node.etl.transform.transformer.NoStructTransformer.java类处理RowData转到Kafka需要的数据格式。
- 在com.alibaba.otter.node.etl.transform.OtterTransformerFactory类中的lookup方法中增加针对KafkaDataMedia的处理。
- 在node中的etl模块中的otter-node-transform.xml增加NoStructTransformer的注入配置。
4. node端的改造 load部分
- 增加Kafka数据源的方言处理类:com.alibaba.otter.node.etl.common.db.dialect.kafka.KafkaDialect,构建kafka的producer来发送消息。
- com.alibaba.otter.node.etl.load.loader.db.DbLoadAction类中的doCall方法增加对kafka方言的判断和处理。
- kafka分区有序,所以在发送binlog数据时候需要根据数据情况设置好分区的方式,可以通过设定分区字段,然后自定义分区去完成
至此整体改造完成。
可以通过如下步骤进行验证:
- manager启动
- manager中配置mysql=>kafka的任务
- 启动node
- manager中启动同步任务
- mysql中修改数据
- 在kafka的customer中查看是否有消息发送过来