一、背景
同步业务库的数据到ODS层,之前一直是全量同步数据,主要考虑IO太大,耗时太长,重复拉取同样的数据,现在考虑增量同步的方式实现,同时对库表数据做分区。
二、同步方案
增量同步主要分为两步,第一步,存量数据一次性同步;第二步,在存量数据的基础之上,做增量;后期的每一次同步都是增量同步。以下是具体同步方案:
1)MYSQL存量数据同步
用Sqoop同步表中全部数据到Hive表中;
2)MYSQL增量数据同步
a.根据hive中最大更新时间,用Sqoop提取更新时间为这个时间之后的增量数据;
三、用sqoop实现mysql数据增量同步到数仓
1、存量数据同步,脚本如下:
1)获取表的所有列,把datetime和timestamp类型,统一在java中映射成TIMESTAMP类型,脚本如下:
getcloumns(){
a=''
b=false
cloumns=''
#sqoop eval --connect $conn --username $uname --password $pwd --query 'desc '$table
result=`sqoop eval --connect $conn --username $uname --password $pwd --query 'desc '$table `
for line in $result
do
if [ ${line} == '|' ]; then
continue
fi
if [ ${line} == '---------------------------------------------------------------------------------------------------------' ]; then
b=true
fi
if [ ${b} == 'false' ]; then
continue
fi
if [ ${line} == 'datetime' ] || [ ${line} == 'timestamp' ]; then
if [[ ${cloumns} != '' ]]; then
cloumns=$cloumns','
fi
cloumns=$cloumns$a'=TIMESTAMP'
fi
a=$line
done
echo $cloumns
}
2) 用sqoop import拉取数据,脚本如下:
rcloumns=$(getcloumns)
hive_database="mysql_"$database
hive_table=$table
sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect $conn \ #连接地址
--username $uname \
--password $pwd \
--table $table \
--delete-target-dir \ #首次同步,清空目标目录
--fields-terminated-by '\001' \ # 指定列分隔符
--lines-terminated-by "\n" \ # 指定行分隔符
--hive-import \ #直达hive库表,不需要从HDFS映射建表
--hive-overwrite \ #覆盖模式
--create-hive-table \ # sqoop自动建表
--hive-database "$hive_database" \ #指定hive 库
--hive-table "$hive_table" \ #指定hive表名
--m 1 \ #指定map任务个数
--hive-drop-import-delims \ # 删除特殊字符
--null-string '\\N' \ #空字符在hive中转义
--null-non-string '\\N' \ #空字符在hive中转义
--map-column-hive $rcloumns
2、增量数据同步
1)创建增量同步的sqoop job,脚本如下:
a、从hive中获取表的最大更新时间
sql="select max(update_time) from mysql_"$database"."$table";"
if [[ $? -ne 0 ]];then
exit 1
fi
echo $sql
result=`hive -e "$sql"`
if [[ $? -ne 0 ]];then
exit 1
fi
echo "result="$result
last_value=${result:0-19}
echo "last_value="$last_value
b、以上面获取的最大更新时间,作为起点,创建sqoop job,脚本如下:
jobname="myjob_"$database"_"$table
echo $jobname
hive_database="mysql_"$database
warehouse_dir='/user/hive/warehouse/'$hive_database'.db/'
sqoop job \
--create $jobname \
-- import \
--connect $conn \
--username $uname \
--password $pwd \
--table $table \
--fields-terminated-by '\001' \
--lines-terminated-by "\n" \
--input-fields-terminated-by '\001' \
--input-lines-terminated-by "\n" \
--warehouse-dir $warehouse_dir \
--incremental lastmodified \
--check-column update_time \
--merge-key id \
--last-value "$last_value" \
--m 1 \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N' \
--input-null-string '\\N' \
--input-null-non-string '\\N'
c、创建sqoop job之后,就是执行job了,脚本如下:
sqoop job -exec jobName