Datax 按照时间戳的定时增量抽取脚本

使用技术: Datax /crontab /python

大概看了下python的语法,因为需要增量做数据抽取,手上暂时没有趁手的兵器,就先用datax吧。 网上有其他方案https://blog.csdn.net/quadimodo/article/details/82186788,貌似写死的执行时间,没采用这种方法。

1. 前提步骤:

安装datax及配置

datax目录:/home/datax/datax/

脚本目录:/home/datax/jobs

log目录:/home/datax/jobs/log/

2. 编写基于时间戳的执行脚本(python脚本)

python脚本语言,写起来也比较方便快捷,我是使用vim编写的,不能写中文,没再继续研究。

脚本思路:

1) 如果使用datax去做增量,需要根据某个时间去判断,那需要将时间传给实际执行的datax.py脚本中去执行,datax支持脚本变量

  1. 基于时间戳做增量,时间戳是一个参数并且是一个可变量,我的思路是将这个文件放到一个X.record的文件中记录时间,本次执行的时候获得上次执行时间lastExecuteTime和当前时间currentTime,抽取的数据就是介于此二者之间的数据。(此处有个小坑,就是datax脚本里面根据时间选择的时候应该是大于等于lastExecuteTime还是小于等于currentTime,数量大的时候抽取数据是不一样的,建议大于等于lastExecuteTime 且小于currentTime)

3) 定时任务使用linux系统的crontab做定时,时间不能设置过短。


[root@localhost ~]# cat /home/datax/jobTimer/dataxScheduler.py

#encoding="utf-8"

# two args , first: datax config file path, logfile path

import time

import sys

import os

print("going to execute")

configFilePath = sys.argv[1]

logFilePath = sys.argv[2]

lastTimeExecuteRecord = sys.argv[3]

print "=============================================="

print "configFilePath      :", configFilePath

print "configFilePath      :", logFilePath

print "lastTimeExecute File :",lastTimeExecuteRecord

print "=============================================="

lastExecuteTime=""

try:

    fo = open(lastTimeExecuteRecord, "r")

    lastExecuteTime = fo.read()

except IOError:

    lastExecuteTime = '1970-01-01 00:00:00'

print("last time execute time:  " + lastExecuteTime)

currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

print("currentTime is        :"+ currentTime)

#os.system("python /home/datax/datax/bin/datax.py " + configFilePath + " --lastTime" +  lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)

script2execute  = "python /home/datax/datax/bin/datax.py %s -p \"-DlastTime='%s' -DcurrentTime='%s'\" >> %s"%( configFilePath, lastExecuteTime, currentTime,logFilePath)

print("to be excute script:"+script2execute)

os.system(script2execute)

print("script execute ending")

# update timestamp to file

fo = open(lastTimeExecuteRecord, "w+")

fo.write(currentTime)

fo.close()

print "ending---",lastTimeExecuteRecord

3. 编写完后修改datax的执行配置json文件


{

"job": {

"setting": {

"speed":{

"byte": 10485760,

"channel": "5"

}

},

"content": [

{

"reader": {

"name":"oraclereader",

"parameter": {

"column": ["...."],  // 自定义

"connection": [

{

"jdbcUrl":["jdbc:oracle:thin:@。。。"], // 自定义

"table": ["table_test"]// 自定义

}

],

"where": " copy_time >= to_date('${lastTime}', 'yyyy-MM-dd HH24:mi:ss') and copy_time< to_date('${currentTime}', 'yyyy-MM-dd HH24:mi:ss')",  // 读取增量数据的逻辑

"password":"***", // 自定义

"username":"***" // 自定义

}

},

"writer": {

"name":"oraclewriter",

"parameter":{

"preSql": [

"delete from ***"

],

"column":[ "。。。"],// 自定义

                       "connection":[

                           {

                               "jdbcUrl":"。。。",// 自定义

                               "table":[

                                   "。。。"// 自定义

                               ]

                           }

                       ],

                       "password":"***",// 自定义

                       "username":"***"// 自定义

                   }

}}]

}}

4. 执行


python /home/datax/jobTimer/dataxScheduler.py \

  '/home/datax/jobs/test_job.json'  \

'/home/datax/jobs/log/test_job.log'  \

'/home/datax/jobTimer/record/test_job.record'

执行过后: test_job.record中文件内容为


[root@xxxx jobs]# cat /home/datax/jobTimer/record/test_job.record

2019-04-18 09:40:01

5. 这个方案的不足之处和改进方案

1) 由于datax没法做数据update,对于不存在update和delete的场景,使用这个方案还可以

2) 有些业务数据表的数据更新时间可能不准确,使用触发器的方式产生新的时间戳会更可靠点

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,264评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,549评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,389评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,616评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,461评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,351评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,776评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,414评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,722评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,760评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,537评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,381评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,787评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,030评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,304评论 1 252
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,734评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,943评论 2 336

推荐阅读更多精彩内容