ETL工具--datax

datax是什么

  • 阿里开源的ETL工具(github地址:https://github.com/alibaba/DataX
    ),ETL是描述从数据源读取数据,经过转换,再加载到目的数据源的过程,而datax是对这一过程的实现,采用framework+plugin框架模式。
    image.png
比对 ETL datax 功能
数据抽取 Extract Reader-plugin 从数据源读取数据,传输到framework
转换 transport Framework 对数据进行转换、清洗、并发、流量控制
数据写入 load Writer-Plugin 从framework读取数据,写入目标数据源

为什么选择datax

  • 可靠稳定,性能强,市场广泛使用,经得起时间和市场的考验
  • 活跃的社区,完善的使用文档
  • 上手容易,配置简单,学习成本低
  • 插件支持的数据源覆盖范围广
  • 提供自定义插件扩展功能,可根据需求自主开发插件
  • 完善的运行日志打印与监控,能够迅速通过日志分析、定位问题,例如总体的运行情况日志如下
任务启动时刻                    : 2019-09-17 10:44:56
任务结束时刻                    : 2019-09-17 10:45:18
任务总计耗时                    :                 22s
任务平均流量                    :          492.72KB/s
记录写入速度                    :           8594rec/s
读出记录总数                    :              171895
读写失败总数                    :                   0

datax的运行机制

image.png
  • job :数据同步的作业,是datax运行最小业务单元
  • task:任务,job拆分出来的最小执行单元
  • taskGroup:任务组,管理一组task的集合
  • jobContainer:任务容器,用于任务拆分、调度,日志打印等工作
  • taskGroupContainer:任务执行的容器

如何使用datax

$ python datax.py {YOUR_JOB.json}
  • datax.py是datax工具提供的python脚本,目录{datax目录/bin}
  • {YOUR_JOB.json} 是datax作业(job)的配置文件,示例如下
{
#全局配置
    "core":{
        "transport":{
            "channel":{
                "speed":{
                    "channel": 2, #job任务通道数,控制并发的线程数
                    "record":-1, #限制数据传输的记录数
                    "byte":-1, #限制数据传输的流量大小
                    "batchSize":2048 #限制批量读取的size
                }
            }
        }
    },
#任务配置
    "job": {
        "content": [
            {
                  "reader": {
                    "name": "",#插件名称
                    
                    "parameter": {
                        "connection": [#连接信息
                            {
                                "jdbcUrl": [""],
                                "querySql": [
                                    ""
                                ],
                                "table": [""]
                            }
                        ],
                         "column": [],
                         "splitPk":"",#分片键,
                         "where":"",#查询限制条件
                        "password": "",
                        "username": "",
                    }
                },
                "writer": {
                    "name": "",
                    "parameter": {
                           "column": [],
                           "connection": [
                            {
                                
                                "jdbcUrl": "",
                                "table": [""]
                            }
                        ],
                        
                        "password": "",
                        "username": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel":5,
                "record":1000,
            }, 
        "errorLimit": {#脏数据阈值配置
                "record":2,
                "percentage": 0.02
            }
        }
    }
}

datax的性能调优

datax性能影响因素
  • 服务器性能:内存、存储,IO

  • 网络环境:宽带大小、网络稳定性

  • 配置文件参数的优化

    • datax脚本运行时的内存大小配置
      python datax.py  --jvm '-Xms1G -Xmx1G'  {YOUR_JOB.json}
      
    • 调整job任务的限速、限流及并发线程数
     "speed":{
                  "channel": 2, #job任务通道数,控制并发的线程数
                    "record":-1, #限制数据传输的记录数
                    "byte":-1, #限制数据传输的流量大小
                    "batchSize":2048 #限制批量读取的size
                }
     注:
     channel:并发数,默认为5,即5个并发,每次可执行task数为5
     例:channel配置为20个并发,就需要4个taskGroup,如果作业有100个 
     task,那么每个group管理25个task。
     byte:限流,在带宽允许条件下合理配置,-1为不限制,往往会出现带宽占用 
     过高的问题。
    

案例分析

问题:数据库A的t_a表数据(275w数据量)同步到数据库B的t_b表,迁移逻辑:
image.png

表结构如下:

CREATE TABLE `t_a` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `phone` varchar(11) NOT NULL,
  `nick_name` varchar(45) DEFAULT NULL,
  `user_name` varchar(45) DEFAULT NULL,
  `sex` tinyint(2) DEFAULT NULL,
  `age` int(4) DEFAULT NULL,
  `created_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
  `created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `modified_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
  `modified_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `t_b` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `mobile` varchar(11) NOT NULL,
  `nick_name` varchar(45) DEFAULT NULL,
  `user_name` varchar(45) DEFAULT NULL,
  `sex` tinyint(2) DEFAULT NULL,
  `age` int(4) DEFAULT NULL,
  `created_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
  `created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `modified_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
  `modified_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
实现方案
  • 基础datax脚本:

{
    "job": {
        "content": [
            {
                  "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["A"],
                                "querySql": [
                                    "SELECT id,phone,nick_name,user_name,sex,age,created_user,created_date,modified_user,modified_date from t_a"
                                ]
                            }
                        ],
                        "password": "",
                        "username": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                           "column": ["id","mobile","nick_name","user_name","sex","age","created_user","created_date","modified_user","modified_date"],
                           "connection": [
                            {
                                
                                "jdbcUrl": "B",
                                "table": ["t_b"]
                            }
                        ],
                        "password": "",
                        "username": ""
                    }
                }
            }
        ]
    }
}
  • 执行结果总体情况:
任务启动时刻                    : 2019-09-18 14:38:12
任务结束时刻                    : 2019-09-18 14:41:53
任务总计耗时                    :                221s
任务平均流量                    :          251.45KB/s
记录写入速度                    :          12501rec/s
读出记录总数                    :             2750323
读写失败总数                    :                   0
  • 对datax进行优化,开启多线程模式,channel配置必须与splitPk结合使用才能生效

{
    "job": {
        "content": [
            {
                  "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["id","mobile","nickname","username","gender","20 as age"],
                        "connection": [
                            {
                                "jdbcUrl": [""],
                                "table": ["tmp_member_all"]
                            }
                        ],
                        "splitPk":"id",
                        "password": "",
                        "username": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                           "column": ["id","phone","nick_name","user_name","sex","age"],
                           "connection": [
                            {
                                
                                "jdbcUrl": "A",
                                "table": ["t_b"]
                            }
                        ],
                        "password": "",
                        "username": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel":5
            }
        }
    }
}

任务执行的总体情况

任务启动时刻                    : 2019-09-18 15:02:58
任务结束时刻                    : 2019-09-18 15:03:59
任务总计耗时                    :                 61s
任务平均流量                    :          921.97KB/s
记录写入速度                    :          45838rec/s
读出记录总数                    :             2750323
读写失败总数                    :                   0
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容