数据仓库维度模型设计
维度建模基本概念
维度模型是数据仓库领域大师Ralph Kimall所倡导,他的《数据仓库工具箱》,是数据仓库工程领域最流行的数仓建模经典。维度建模以分析决策的需求出发构建模型,构建的数据模型为分析需求服务,因此它重点解决用户如何更快速完成分析需求,同时还有较好的大规模复杂查询的响应性能。
维度建模是专门应用于分析型数据库 数据仓库 数据集市建模的方法。数据集市可以理解为是一种"小型数据仓库"。
事实表
发生在现实世界中的操作型事件,其所产生的可度量数值,存储在事实表中。从最低的粒度级别来看,事实表行对应一个度量事件,反之亦然。
事实表表示对分析主题的度量。比如一次购买行为我们就可以理解为是一个事实。
事实表 |
---|
图中的订单表就是一个事实表,你可以理解他就是在现实中发生的一次操作型事件,我们每完成一个订单,就会在订单中增加一条记录。
事实表的特征:表里没有存放实际的内容,他是一堆主键的集合,这些ID分别能对应到维度表中的一条记录。事实表包含了与各维度表相关联的外键,可与维度表关联。事实表的度量通常是数值类型,且记录数会不断增加,表数据规模迅速增长。
维度表
每个维度表都包含单一的主键列。维度表的主键可以作为与之关联的任何事实表的外键,当然,维度表行的描述环境应与事实表行完全对应。维度表通常比较宽,是扁平型非规范表,包含大量的低粒度的文本属性。
维度表示你要对数据进行分析时所用的一个量,比如你要分析产品销售情况, 你可以选择按类别来进行分析,或按区域来分析。这样的按..分析就构成一个维度。上图中的用户表 商家表 时间表这些都属于维度表,这些表都有一个唯一的主键,然后在表中存放了详细的数据信息。
总的说来,在数据仓库中不需要严格遵守规范化设计原则。因为数据仓库的主导功能就是面向分析,以查询为主,不涉及数据更新操作。事实表的设计是以能够正确记录历史信息为准则,维度表的设计是以能够以合适的角度来聚合主题内容为准则。
维度建模三种模式
星型模型
星形模式(Star Schema)是最常用的维度建模方式。星型模式是以事实表为中心,所有的维度表直接连接在事实表上,像星星一样。
星形模式的维度建模由一个事实表和一组维表成,且具有以下特点:
a. 维表只和事实表关联,维表之间没有关联;
b. 每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键;
c. 以事实表为核心,维表围绕核心呈星形分布;
星型模型 |
---|
雪花模式
雪花模式(Snowflake Schema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能也比星型模型要低。所以一般不是很常用。
雪花模型 |
---|
星座模式
星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。
前面介绍的两种维度建模方法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务发展后期,绝大部分维度建模都采用的是星座模式。
星座模型 |
---|
数据仓库理论知识
为什么要分层
分层的主要原因是在管理数据的时候,能对数据有一个更加清晰的掌控,详细来讲,主要有下面几个原因:
清晰数据结构:
每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。
数据血缘追踪:
简单来说,我们最终给业务呈现的是一个能直接使用业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。
减少重复开发:
规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。
把复杂问题简单化:
将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。
屏蔽原始数据的异常:
屏蔽业务的影响,不必改一次业务就需要重新接入数据
数仓分层思想
数据分层每个企业根据自己的业务需求可以分成不同的层次,但是最基础的分层思想,理论上数据分为三个层,数据运营层 数据仓库层和数据服务层。基于这个基础分层之上添加新的层次,来满足不同的业务需求。
数据运营层(ODS)
Operate data store,操作数据存储,是最接近数据源中数据的一层,数据源中的数据,经过抽取 洗净 传输,也就说传说中的ETL之后,装入本层。本层的数据,总体上大多是按照源头业务系统的分类方式而分类的。
例如:MySQL里面的一张表可以通过sqoop之间抽取到ODS层
ODS层数据的来源方式:
-
业务库
- 经常会使用sqoop来抽取,比如我们每天定时抽取一次。在实时方面,可以考虑用canal监听mysql的binlog,实时接入即可。
-
埋点日志
- 线上系统会打入各种日志,这些日志一般以文件的形式保存,我们可以选择用flume定时抽取,也可以用用spark streaming或者Flink来实时接入,当然,kafka也会是一个关键的角色。
-
消息队列
- 来自ActiveMQ Kafka的数据等
数据仓库层(DW):
Data warehouse,数据仓库层。在这里,从ODS层中获得的数据按照主题建立各种数据模型。例如以研究人的旅游消费为主题的数据集中,便可以结合航空公司的登机出行信息,以及银联系统的刷卡记录,进行结合分析,产生数据集。在这里,我们需要了解四个概念:维(dimension) 事实(Fact) 指标(Index)和粒度( Granularity)。
应用层(ADS):
该层主要是提供数据产品和数据分析使用的数据,一般会存放在ES MySQL等系统中供线上系统使用,也可能会存在Hive或者Druid中供数据分析和数据挖掘使用。
例如:我们经常说的报表数据,或者说那种大宽表,一般就放在这里。
1 ODS 数据准备层
功能
- ODS层是数据仓库准备区,为DWD层提供基础原始数据,可减少对业务系统的影响
建模方式及原则:
- 从业务系统增量抽取 保留时间由业务需求决定 可分表进行周期存储 数据不做清洗转换与业务系统数据模型保持一致 按主题逻辑划分
2 DWD 数据明细层
功能:
- 为DW层提供来源明细数据,提供业务系统细节数据的长期沉淀,为未来分析类需求的扩展提供历史数据支撑
建模方式及原则:
- 数据模型与ODS层一致,不做清洗转换处理 为支持数据重跑可额外增加数据业务日期字段 可按年月日进行分表 用增量ODS层数据和前一天DWD相关表进行merge处理
3 DW(B/S) 数据汇总层
功能:
- 为DW ST层提供细粒度数据,细化成DWB和DWS;
- DWB是根据DWD明细数据进行转换,如维度转代理键 身份证清洗 会员注册来源清晰 字段合并 空值处理 脏数据处理 IP清晰转换 账号余额清洗 资金来源清洗等;
- DWS是根据DWB层数据按各个维度ID进行高粒度汇总聚合,如按交易来源,交易类型进行汇合
建模方式及原则:
- 聚合 汇总增加派生事实;
- 关联其它主题的事实表,DW层可能会跨主题域;
- DWB保持低粒度汇总加工数据,DWS保持高粒度汇总数据;
- 数据模型可能采用反范式设计,合并信息等。
4 DM 数据集市层
功能:
可以是一些宽表,是根据DW层数据按照各种维度或多种维度组合把需要查询的一些事实字段进行汇总统计并作为单独的列进行存储;
满足一些特定查询 数据挖掘应用;
应用集市数据存储
建模方式及原则:
尽量减少数据访问时计算,优化检索;
维度建模,星型模型;
事实拉宽,度量预先计算;
分表存储
5 ST 数据应用层(ADS层)
功能:
ST层面向用户应用和分析需求,包括前端报表 分析图表 KPI 仪表盘 OLAP 专题等分析,面向最终结果用户;
适合作OLAP 报表模型,如ROLAP,MOLAP;
-
根据DW层经过聚合汇总统计后的粗粒度事实表
建模方式及原则:
保持数据量小;
维度建模,星形模型;
各位维度代理键+度量;
增加数据业务日期字段,支持数据重跑;
不分表存储
数仓中表的种类及其概念
一般情况下表分为两个类型,分别维度表和事务表
维度表
维度表,一般是指对应一些业务状态,代码的解释表。也可以称之为码表。比如地区表,订单类型,支付方式,商品分类等等。
维度表可以分为两类:一般维度表和固定维度表
一般维度表的数据是不断增加和变化的
固定维度表的数据是不变的
事实表
事实表分为两类:事务型事实表和周期型事实表
事务型事实表,一般指随着业务发生不断产生的数据。特点是一旦发生不会再变化。 一般比如,交易流水,操作日志,出库入库记录等等。
周期型事实表,一般指随着业务发生不断产生的数据。与事务型不同的是,数据会随着业务周期性的推进而变化。
比如订单,其中订单状态会周期性变化。 再比如,请假 贷款申请,随着批复状态在周期性变化。
数仓中表的同步策略
维度表
可能会有变化的数据可以存储每日全量或者使用拉链表。(比如订单类型,审批状态,商品分类)
事实表
事务型事实表
- 每日增量: 因为数据不会变化,而且数据量巨大,所以每天只同步新增数据即可;每日分区。
周期型事实表
每日全量:首先这类表从数据量的角度,存每日全量的话,数据量太大,冗余也太大。
每日增量:如果用每日增量的话无法反应数据变化;每日新增及变化量可以用,包括了当日的新增和修改。一般来说这个表,足够计算大部分当日数据的。但是这种依然无法解决能够得到某一个历史时间点(时间切片)的切片数据。
拉链表:利用每日新增和变化表,制作一张拉链表,以方便的取到某个时间切片的快照数据。
所以我们需要得到每日新增及变化量。
数据仓库开发
业务系统表结构介绍
订单表itcast_orders
字段名称 | 数据类型 | 字段说明 |
---|---|---|
orderId | bigint(11) | 订单id |
orderNo | varchar(20) | 订单编号 |
userId | bigint(11) | 用户id |
orderStatus | tinyint(4) | 订单状态,-3:用户拒收;-2:未付款的订单;-1:用户取消;0:待发货;1:配送中;2:用户确认收货 |
goodsMoney | decimal(11,2) | 商品金额 |
deliverType | tinyint(4) | 收货方式 |
deliverMoney | decimal(11,2) | 运费 |
totalMoney | decimal(11,2) | 订单金额(包括运费) |
realTotalMoney | decimal(11,2) | 实际订单金额(折扣后金额) |
payType | tinyint(4) | 支付方式,0:未知;1:支付宝,2:微信;3、现金;4、其他 |
payFrom | varchar(20) | 支付来源 |
isPay | tinyint(4) | 是否支付 |
areaId | int(11) | 区域最低一级 |
areaIdPath | varchar(255) | 区域idpath |
userName | varchar(20) | 收件人姓名 |
userAddressId | int(11) | 收件人地址ID |
userAddress | varchar(255) | 收件人地址 |
userPhone | char(20) | 收件人电话 |
orderScore | int(11) | 订单所得积分 |
isInvoice | tinyint(4) | 是否开发票,1:需要;0:不需要 |
invoiceClient | varchar(255) | 发票抬头 |
orderRemarks | varchar(255) | 订单备注 |
orderSrc | tinyint(4) | 订单来源,0:商城;1:微信;2:手机版;3:安卓App4:苹果App;5订餐设备 |
needPay | decimal(11,2) | 需缴费用 |
payRand | int(11) | 货币单位 |
orderType | int(11) | 订单类型 |
isRefund | tinyint(4) | 是否退款 |
isAppraise | tinyint(4) | 是否点评 |
cancelReason | int(11) | 取消原因ID |
rejectReason | int(11) | 用户拒绝原因ID |
rejectOtherReason | varchar(255) | 拒收原因 |
isClosed | tinyint(4) | 是否订单已完结 |
orderunique | varchar(50) | 订单流水号 |
isFromCart | tinyint(1) | 是否来自购物车 0:直接下单 1:购物车 |
receiveTime | varchar(25) | 收货时间 |
deliveryTime | varchar(25) | 发货时间 |
tradeNo | varchar(100) | 在线支付交易流水 |
dataFlag | tinyint(4) | 订单有效标志 |
createTime | varchar(25) | 下单时间 |
settlementId | int(11) | 是否结算,大于0的话则是结算ID |
commissionFee | decimal(11,2) | 订单应收佣金 |
scoreMoney | decimal(11,2) | 积分抵扣金额 |
useScore | int(11) | 花费积分 |
extraJson | text | 额外信息 |
noticeDeliver | tinyint(3) | 提醒发货,0:未提醒;1:已提醒 |
invoiceJson | text | 发票信息 |
lockCashMoney | decimal(11,2) | 锁定提现金额 |
payTime | varchar(25) | 支付时间 |
isBatch | tinyint(4) | 是否拼单 |
totalPayFee | int(11) | 总支付金额 |
modifiedTime | timestamp | 更新时间 |
订单明细表 itcast_order_goods
字段名 | 类型 | 说明 |
---|---|---|
ogId | bigint(11) | 订单明细(商品)id |
orderId | bigint(11) | 订单id |
goodsId | bigint(11) | 商品id |
goodsNum | bigint(11) | 商品数量 |
goodsPrice | decimal(11,2) | 商品价格 |
goodsSpecId | int(11) | 商品规格id |
goodsSpecNames | varchar(500) | 商品规格列表 |
goodsName | varchar(200) | 商品名称 |
goodsImg | varchar(150) | 商品图片 |
extraJson | text | 额外信息 |
goodsType | tinyint(4) | 商品类型 |
commissionRate | decimal(11,2) | 商品佣金比率 |
goodsCode | varchar(20) | 商品编码 |
promotionJson | text | 促销信息 |
createTime | varchar(20) | 创建时间 |
商品信息表 itcast_goods
goodsId | bigint(11) | 商品id |
---|---|---|
goodsSn | varchar(20) | 商品编号 |
productNo | varchar(20) | 商品货号 |
goodsName | varchar(200) | 商品名称 |
goodsImg | varchar(150) | 商品图片 |
shopId | bigint(11) | 门店ID |
goodsType | tinyint(4) | 货物类型 |
marketPrice | decimal(11,2) | 市场价 |
shopPrice | decimal(11,2) | 门店价 |
warnStock | bigint(11) | 预警库存 |
goodsStock | bigint(11) | 商品总库存 |
goodsUnit | char(10) | 单位 |
goodsTips | text | 促销信息 |
isSale | tinyint(4) | 是否上架 0:不上架 1:上架 |
isBest | tinyint(4) | 是否精品 0:否 1:是 |
isHot | tinyint(4) | 是否热销产品 0:否 1:是 |
isNew | tinyint(4) | 是否新品 0:否 1:是 |
isRecom | tinyint(4) | 是否推荐 0:否 1:是 |
goodsCatIdPath | varchar(255) | 商品分类ID路径catId1_catId2_catId3 |
goodsCatId | int(11) | 最后一级商品分类ID |
shopCatId1 | int(11) | 门店商品分类第一级ID |
shopCatId2 | int(11) | 门店商品第二级分类ID |
brandId | int(11) | 品牌ID |
goodsDesc | text | 商品描述 |
goodsStatus | tinyint(4) | 商品状态 -1:违规 0:未审核 1:已审核 |
saleNum | int(11) | 总销售量 |
saleTime | varchar(25) | 上架时间 |
visitNum | int(11) | 访问数 |
appraiseNum | int(11) | 评价书 |
isSpec | tinyint(4) | 是否有规格 0:没有 1:有 |
gallery | text | 商品相册 |
goodsSeoKeywords | varchar(200) | 商品SEO关键字 |
illegalRemarks | varchar(255) | 状态说明 一般用于说明拒绝原因 |
dataFlag | tinyint(4) | 删除标志 -1:删除 1:有效 |
createTime | varchar(25) | 创建时间 |
isFreeShipping | tinyint(4) | |
goodsSerachKeywords | text | 商品搜索关键字 |
店铺表 itcast_shops
字段名 | 字段类型 | 字段说明 |
---|---|---|
shopId | int(11) | 商铺ID,自增 |
shopSn | varchar(20) | |
userId | int(11) | 商铺联系人ID, |
areaIdPath | varchar(255) | |
areaId | int(11) | |
isSelf | tinyint(4) | |
shopName | varchar(100) | 商铺名称, |
shopkeeper | varchar(50) | |
telephone | varchar(20) | 联系人电话, |
shopCompany | varchar(255) | 商家实体名称, |
shopImg | varchar(150) | logo图片, |
shopTel | varchar(40) | 商家联系电话, |
shopQQ | varchar(50) | 联系人QQ, |
shopWangWang | varchar(50) | |
shopAddress | varchar(255) | 商家地址, |
bankId | int(11) | |
bankNo | varchar(20) | |
bankUserName | varchar(50) | |
isInvoice | tinyint(4) | |
invoiceRemarks | varchar(255) | |
serviceStartTime | bigint(20) | 服务开始时间, |
serviceEndTime | bigint(20) | 服务结束时间, |
freight | int(11) | |
shopAtive | tinyint(4) | |
shopStatus | tinyint(4) | 商铺状态, |
statusDesc | varchar(255) | |
dataFlag | tinyint(4) | |
createTime | date | |
shopMoney | decimal(11,2) | |
lockMoney | decimal(11,2) | |
noSettledOrderNum | int(11) | |
noSettledOrderFee | decimal(11,2) | |
paymentMoney | decimal(11,2) | |
bankAreaId | int(11) | |
bankAreaIdPath | varchar(100) | |
applyStatus | tinyint(4) | |
applyDesc | varchar(255) | |
applyTime | datetime | |
applyStep | tinyint(4) | |
shopNotice | varchar(300) | 店铺公告, |
rechargeMoney | decimal(11,2) | 充值金额, |
longitude | decimal(10,7) | |
latitude | decimal(10,7) | |
mapLevel | int(11) | |
BDcode | varchar(16) | 公司管理人员code, |
商品分类表 itcast_goods_cats
字段名 | 字段说明 |
---|---|
catId | 品类ID |
parentId | 父ID |
catName | 分类名称 |
isShow | 是否显示 |
isFloor | 是否显示楼层 |
catSort | 排序号 |
dataFlag | 删除标志 |
createTime | 建立时间 |
commissionRate | 商品佣金比例 |
catImg | |
subTitle | 楼层副标题 |
simpleName | 简写名称 |
seoTitle | 分类SEO标题 |
seoKeywords | 分类SEO关键字 |
seoDes | 分类SEO描述 |
catListTheme | 商品分类列表风格 |
detailTheme | 商品详情风格 |
mobileCatListTheme | 移动端商品分类列表风格 |
mobileDetailTheme | 移动端商品详情风格 |
wechatCatListTheme | 微信端商品分类列表风格 |
wechatDetailTheme | 微信端商品详情风格 |
cat_level | 分类级别,共3级 |
组织结构表 itcast_org
字段名 | 字段说明 |
---|---|
orgId | 组织ID |
parentId | 父ID |
orgName | 组织名称 |
orgLevel | 组织级别1;总部及大区级部门;2:总部下属的各个部门及基部门;3:具体工作部门 |
managerCode | 主管工号 |
isdelete | 删除标志,1:删除;0:有效 |
createTime | 创建时间 |
updateTime | 最后修改时间 |
isShow | 是否显示,0:是;1:否 |
orgType | 组织类型,0:总裁办;1:研发;2:销售;3:运营;4:产品 |
订单退货表 itcast_order_refunds
id | int(11) | 自增ID |
---|---|---|
orderId | int(11) | 订单id |
goodsId | int(11) | 商品id |
refundTo | int(11) | 接收退款用户 |
refundReson | int(11) | 用户申请退款原因ID |
refundOtherReson | varchar(255) | 用户申请退款原因 |
backMoney | decimal(11,2) | 退款金额 |
refundTradeNo | varchar(100) | 退款流水号 |
refundRemark | varchar(500) | 退款备注 |
refundTime | varchar(25) | 退款时间 |
shopRejectReason | varchar(255) | 店铺不同意退款原因 |
refundStatus | tinyint(4) | 退款状态 |
createTime | varchar(25) | 用户申请退款时间 |
用户表 itcast_users
userId | int(11) | 用户id |
---|---|---|
loginName | varchar(20) | 登录名 |
loginSecret | int(11) | 登录凭证 |
loginPwd | varchar(50) | 登录密码 |
userType | tinyint(4) | 用户类型 |
userSex | tinyint(4) | 用户性别 |
userName | varchar(100) | 用户名 |
trueName | varchar(100) | 真实姓名 |
brithday | date | 生日 |
userPhoto | varchar(200) | 用户头像 |
userQQ | varchar(20) | 用户QQ |
userPhone | char(11) | 用户手机号 |
userEmail | varchar(50) | 邮箱 |
userScore | int(11) | 积分 |
userTotalScore | int(11) | 总积分 |
lastIP | varchar(16) | 最后一次登录IP |
lastTime | datetime | 最后一次登录时间 |
userFrom | tinyint(4) | 注册渠道 |
userMoney | decimal(11,2) | 用户余额 |
lockMoney | decimal(11,2) | 锁定余额 |
userStatus | tinyint(4) | 用户状态 |
dataFlag | tinyint(4) | 数据状态 |
createTime | datetime | 创建时间 |
payPwd | varchar(100) | 支付密码 |
rechargeMoney | decimal(11,2) | 重置金额 |
isInform | tinyint(4) | 是否接收通知 |
用户收货地址表 itcast_user_address
addressId | int(11) | 地址id |
---|---|---|
userId | int(11) | 用户id |
userName | varchar(50) | 用户名 |
otherName | varchar(50) | 地址类型 |
userPhone | varchar(20) | 用户联系方式 |
areaIdPath | varchar(255) | 地址id路径 |
areaId | int(11) | 区域ID |
userAddress | varchar(255) | 用户地址 |
isDefault | tinyint(4) | 是否默认地址 |
dataFlag | tinyint(4) | 数据状态 |
createTime | datetime | 创建时间 |
支付方式表 itcast_payments
id | int(11) | 唯一id |
---|---|---|
payCode | varchar(20) | 支付类型码 |
payName | varchar(255) | 支付类型名称 |
payDesc | text | 描述 |
payOrder | int(11) | 显示顺序 |
payConfig | text | 配置 |
enabled | tinyint(4) | 是否启用 |
isOnline | tinyint(4) | 是否在线 |
payFor | varchar(100) |
项目环境初始化
导入mysql模拟数据
2.资料\mysql建表语句\10tables.sql文件上传到linux,登录mysql使用source命令执行该sql文件创建数据库和表
mysql -uroot -p
source /root/sql/10tables.sql;
Hive分层说明
-
分库存放
- ods层
- dw层
- ads层
-
命名规则
ods层表与原始数据库表名称相同
-
dw层表
fact_前缀表示事实表
dim_前缀表示维度表
创建分层数据库:
create database itcast_ods;
create database itcast_dw;
create database itcast_ads;
创建ods层数据表
- hive 分为外部表与内部表,为便于管理,该部分均使用内部表
- 执行
资料\hiveods层建表语句\ods_create_table.sql
数据采集
ods层全量数据抽取
步骤:
1 拖拽组件构建Kettle作业结构图
全量采集配置图 |
---|
2 转换结构图--》配置命名参数
配置转换命名参数 |
---|
3 配置Hive SQL脚本
msck repair table itcast_ods.itcast_orders;
msck repair table itcast_ods.itcast_goods;
msck repair table itcast_ods.itcast_order_goods;
msck repair table itcast_ods.itcast_shops;
msck repair table itcast_ods.itcast_goods_cats;
msck repair table itcast_ods.itcast_org;
msck repair table itcast_ods.itcast_order_refunds;
msck repair table itcast_ods.itcast_users;
msck repair table itcast_ods.itcast_user_address;
msck repair table itcast_ods.itcast_payments;
修复分区 |
---|
4 配置表输入
SELECT
*
FROM itcast_orders
WHERE DATE_FORMAT(createtime, '%Y%m%d') <= '${dt}';
组件图 |
---|
5 配置字段选择指定日期格式,配置parquet格式并设置snappy压缩输出
字段选择 |
---|
文件位置 |
parquet output配置 |
测试
测试数据是否都正确被加载。
select * from itcast_ods.itcast_orders limit 2;
select * from itcast_ods.itcast_goods limit 2;
select * from itcast_ods.itcast_order_goods limit 2;
select * from itcast_ods.itcast_shops limit 2;
select * from itcast_ods.itcast_goods_cats limit 2;
select * from itcast_ods.itcast_org limit 2;
select * from itcast_ods.itcast_order_refunds limit 2;
select * from itcast_ods.itcast_users limit 2;
select * from itcast_ods.itcast_user_address limit 2;
select * from itcast_ods.itcast_payments limit 2;
注意:
1:其中itcast_orders,itcast_order_goods,itcast_order_refunds表是根据时间抽取,其余表进行全量抽取!!
2:注意使用字段选择组件时要注意修改日期格式为UTF8!!,parquet中fields中date类型改为UTF8类型!!
缓慢变化维
什么是缓慢变化维(SCD)
1 缓慢变化维简介
- 缓慢变化维,简称SCD(Slowly Changing Dimensions)
- 一些维度表的数据不是静态的,而是会随着时间而缓慢地变化(这里的缓慢是相对事实表而言,事实表数据变化的速度比维度表快)
- 这种随着时间发生变化的维度称之为缓慢变化维
- 把处理维度表数据历史变化的问题,称为缓慢变化维问题,简称SCD问题
2 举例说明
例如:用根据用户维度,统计不同出生年份的消费金额占比。(80后 90后 00后)。
而期间,用户可能去修改用户数据,例如:将出生日期改成了 1992年。此时,用户维度表就发生了变化。当然这个变化相对事实表的变换要慢。但这个用户维度表的变化,就是缓慢变化维。
用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|
114 | 张三 | 1988-09-08 | 北京市朝阳区 |
这个用户的数据不是一直不变,而是有可能发生变化。例如:用户修改了出生日期 或者用户修改了住址。
SCD问题的几种解决方案
以下为解决缓慢变化维问题的几种办法:
保留原始值:指标计算不符合最新维度数据
改写属性值:无法获取到历史状态
增加维度新行:拉链表
增加维度新列:成本太高
-
添加历史表:增加维护难度
对于历史数据会变化的以及还有新增数据的表同步到数仓中我们有两个要求:
1、 数据的历史状态我们要保存,
2、对于新增数据也保存
SCD解决方案 - 保留原始值
某一个属性值绝不会变化。事实表始终按照该原始值进行分组。例如:
- 出生日期的数据,始终按照用户第一次填写的数据为准
SCD解决方案 - 改写属性值
- 对其相应需要重写维度行中的旧值,以当前值替换。因此其始终反映最近的情况
- 当一个维度值的数据源发生变化,并且不需要在维度表中保留变化历史时,通常用新数据来覆盖旧数据。这样的处理使属性所反映的中是最新的赋值。
用户维度表
修改前:
用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|
114 | 张三 | 1988-09-08 | 北京市朝阳区 |
修改后:
用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|
114 | 张三 | 1992-09-08 | 北京市海淀区 |
- 这种方法有个前提,用户不关心这个数据的变化
- 这样处理,易于实现,但是没有保留历史数据,无法分析历史变化信息
SCD解决方案 - 增加维度新行
数据仓库系统的目标之一是正确地表示历史。典型代表就是拉链表。
保留历史的数据,并插入新的数据。
用户维度表
修改前:
用户ID | 用户名 | 出生日期 | 住址 | |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
修改后:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
9528 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
SCD解决方案 - 增加维度新列
用不同的字段来保存不同的值,就是在表中增加一个字段,这个字段用来保存变化后的当前值,而原来的值则被称为变化前的值。总的来说,这种方法通过添加字段来保存变化后的痕迹。
用户维度表
修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
修改后
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 现住址 | |
---|---|---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 1992-09-08 | 北京市朝阳区 | 北京市海淀区 |
SCD解决方案 - 使用历史表
另外建一个表来保存历史记录,这种方式就是将历史数据与当前数据完全分开来,在维度中只保存当前最新的数据。
用户维度表
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
用户维度历史表
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9537 | 114 | 张三 | 1988-09-02 | 北京市朝阳区 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
这种方式的优点是可以同时分析当前及前一次变化的属性值,缺点是只保留了最后一次变化信息。
数据采集-拉链表技术介绍
数据仓库的数据模型设计过程中,经常会遇到这样的需求:
- 表中的部分字段会被update,例如:
- 用户的地址,产品的描述信息,品牌信息等等;
- 需要查看某一个时间点或者时间段的历史快照信息,例如:
- 查看某一个产品在历史某一时间点的状态
- 查看某一个用户在过去某一段时间内,更新过几次等等
- 变化的比例和频率不是很大,例如:
- 总共有1000万的会员,每天新增和发生变化的有10万左右
商品历史快照案例
需求:
有一个商品表:
列名 | 类型 | 说明 |
---|---|---|
goods_id | varchar(50) | 商品编号 |
goods_status | varchar(50) | 商品状态(待审核、待售、在售、已删除) |
createtime | varchar(50) | 商品创建日期 |
modifytime | varchar(50) | 商品修改日期 |
2019年12月20日的数据如下所示:
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
001 | 待审核 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-20 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-20 | 2019-12-20 |
商品的状态,会随着时间推移而变化,我们需要将商品的所有变化的历史信息都保存下来。如何实现呢?
方案一:快照每一天的数据到数仓
该方案为:
- 每一天都保存一份全量,将所有数据同步到数仓中
- 很多记录都是重复保存,没有任何变化
12月20日(4条数据)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
001 | 待审核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
12月21日(10条数据)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
以下为12月20日快照数据 | |||
001 | 待审核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
以下为12月21日快照数据 | |||
001 | 待售(从待审核到待售) | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
005(新商品) | 待审核 | 2019-12-21 | 2019-12-21 |
006(新商品) | 待审核 | 2019-12-21 | 2019-12-21 |
12月22日(18条数据)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
以下为12月20日快照数据 | |||
001 | 待审核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
以下为12月21日快照数据 | |||
001 | 待售(从待审核到待售) | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
005 | 待审核 | 2019-12-21 | 2019-12-21 |
006 | 待审核 | 2019-12-21 | 2019-12-21 |
以下为12月22日快照数据 | |||
001 | 待售 | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 已删除(从在售到已删除) | 2019-12-20 | 2019-12-22 |
004 | 待审核 | 2019-12-21 | 2019-12-21 |
005 | 待审核 | 2019-12-21 | 2019-12-21 |
006 | 已删除(从待审核到已删除) | 2019-12-21 | 2019-12-22 |
007 | 待审核 | 2019-12-22 | 2019-12-22 |
008 | 待审核 | 2019-12-22 | 2019-12-22 |
方案一:MySQL到Hive数仓代码实现
MySQL&Hive初始化
1 在MySQL demo库中 创建表
-- 创建数据库
CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
-- 创建商品表
create table if not exists `demo`.`t_product`(
goods_id varchar(50), -- 商品编号
goods_status varchar(50), -- 商品状态
createtime varchar(50), -- 商品创建时间
modifytime varchar(50) -- 商品修改时间
);
2 在Hive中 demo库创建表
-- 创建表
create database if not exists `demo`;
-- 创建ods层表
create table if not exists `demo`.`ods_product`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string -- 商品修改时间
)
partitioned by (dt string)
row format delimited fields terminated by ',' stored as TEXTFILE;
-- 创建dw层表
create table if not exists `demo`.`dw_product`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string -- 商品修改时间
)
partitioned by (dt string)
row format delimited fields terminated by ',' stored as TEXTFILE;
增量导入12月20日数据
1 MySQL数据库导入12月20日数据(4条数据)
insert into `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) values
('001', '待审核', '2019-12-18', '2019-12-20'),
('002', '待售', '2019-12-19', '2019-12-20'),
('003', '在售', '2019-12-20', '2019-12-20'),
('004', '已删除', '2019-12-15', '2019-12-20');
mysql图示 |
---|
2 使用Kettle将MySQL数据导出,并导入到分区HDFS位置
Kettle转换流程图 |
---|
创建Hive分区
-- 创建分区
alter table `demo`.`ods_product` add if not exists partition (dt='2019-12-20');
增加分区 |
---|
表输入 |
Hadoop File output |
3 Hive中查询数据
select * from `demo`.`ods_product`
4 数据导入维度表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-20')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-20';
增量导入12月21日数据
1 MySQL数据库导入12月21日数据(6条数据)
UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
('005', '待审核', '2019-12-21', '2019-12-21'),
('006', '待审核', '2019-12-21', '2019-12-21');
2 运行Kettle转换,导入2019年12月21日数据
执行kettle转换 |
---|
3 Hive查询数据
select * from `demo`.`ods_product` where dt='2019-12-21';
mysql数据展示 |
---|
4 数据导入dw层表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-21')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-21';
最终数据展示 |
---|
增量导入12月22日数据
1 MySQL数据库导入12月22日数据(6条数据)
UPDATE `demo`.`t_product` SET goods_status = '已删除', modifytime = '2019-12-22' WHERE goods_id = '003';
UPDATE `demo`.`t_product` SET goods_status = '已删除', modifytime = '2019-12-22' WHERE goods_id = '006';
INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
('007', '待审核', '2019-12-22', '2019-12-22'),
('008', '待审核', '2019-12-22', '2019-12-22');
2 运行Kettle转换,导入2019年12月22日数据
导入2019年12月22日数据 |
---|
3 Hive查询数据
select * from ods_product where dt='2019-12-22';
hive数据 |
---|
4 数据导入dw层表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-22')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-22';
2019-12-22数据 |
---|
从上述案例,可以看到:
- 表每天保留一份全量,每次全量中会保存很多不变的信息,如果数据量很大的话,对存储是极大的浪费
可以将表设计为拉链表,既能满足反应数据的历史状态,又可以最大限度地节省存储空间
方案二:使用拉链表保存历史快照
拉链表
- 拉链表不存储冗余的数据,只有某行的数据发生变化,才需要保存下来,相比每次全量同步会节省存储空间
- 能够查询到历史快照
- 额外的增加了两列(dw_start_date dw_end_date),为数据行的生命周期
12月20日商品拉链表的数据:
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待审核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
- 12月20日的数据是全新的数据导入到dw表
- dw_start_date表示某一条数据的生命周期起始时间,即数据从该时间开始有效(即生效日期)
- dw_end_date表示某一条数据的生命周期结束时间,即数据到这一天失效(即失效日期)
- dw_end_date为9999-12-31,表示当前这条数据是最新的数据,数据到9999-12-31才过期
12月21日商品拉链表的数据
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待审核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
001 | 待售 | 2019-12-18 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
005 | 待审核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
006 | 待审核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
- 拉链表中没有存储冗余的数据,只要数据没有变化,无需同步
- 001编号的商品数据的状态发生了变化(从待审核 → 待售),需要将原有的dw_end_date变为2019-12-21,表示待审核状态,在2019/12/20(包含) - 2019/12/21有效
- 001编号新的状态重新保存了一条记录,dw_start_date为2019/12/21,dw_end_date为9999/12/31
12月22日商品拉链表的数据
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待审核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 2019-12-21 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
001 | 待售 | 2019-12-18 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
005 | 待审核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
006 | 待审核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
003 | 已删除 | 2019-12-20 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
007 | 待审核 | 2019-12-22 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
008 | 待审核 | 2019-12-22 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
查询拉链表
1 获取2019-12-20日的历史快照数据
select * from demo.dw_product_2 where dw_start_date <= '2019-12-20' and dw_end_date >= '2019-12-20' order by goods_id;
2 获取最新的商品快照数据
select * from demo.dw_product_2 where dw_end_date = '9999-12-31' order by goods_id;
方案二:拉链表存储历史快照代码实现
操作步骤:
-
在原有dw层表上,添加额外的两列
- 生效日期(dw_start_date)
- 失效日期(dw_end_date)
只同步当天修改的数据到ods层
-
拉链表算法实现
编写SQL处理当天最新的数据
编写SQL处理dw层历史数据,重新计算之前的dw_end_date
拉链表的数据为:当天最新的数据 UNION ALL 历史数据
拉链表的数据为:当天最新的数据 UNION ALL 历史数据
代码实现:
1 MySQL&Hive表初始化
MySQL创建商品表2
-- 创建数据库
CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
-- 创建商品表
create table if not exists `demo`.`t_product_2`(
goods_id varchar(50), -- 商品编号
goods_status varchar(50), -- 商品状态
createtime varchar(50), -- 商品创建时间
modifytime varchar(50) -- 商品修改时间
)ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
Hive ODS层建表
-- 创建表
create database if not exists `demo`;
-- 创建ods层表
create table if not exists `demo`.`ods_product_2`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string -- 商品修改时间
)
partitioned by (dt string) --按照天分区
row format delimited fields terminated by ',' stored as TEXTFILE;
Hive dw层创建拉链表
-- 创建拉链表
create table if not exists `demo`.`dw_product_2`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string, -- 商品修改时间
dw_start_date string, -- 生效日期
dw_end_date string -- 失效日期
)
row format delimited fields terminated by ',' stored as TEXTFILE;
全量导入2019年12月20日数据
1 MySQL数据库导入12月20日数据(4条数据)
insert into `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) values
('001', '待审核', '2019-12-18', '2019-12-20'),
('002', '待售', '2019-12-19', '2019-12-20'),
('003', '在售', '2019-12-20', '2019-12-20'),
('004', '已删除', '2019-12-15', '2019-12-20');
mysql中数据 |
---|
2 使用Kettle进行全量同步MySQL数据到Hive ods层表
Kettle组件图 |
---|
设置命名参数 |
创建Hive分区
-- 创建分区
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
hive增加分区 |
---|
表输入
SELECT
*
FROM t_product_2
where modifytime <= '${dt}'
表输入组件 |
---|
Hadoop File Ouput |
3 编写SQL从ods导入dw当天最新的数据
-- 从ods层导入dw当天最新数据
insert overwrite table `demo`.`dw_product_2`
select
goods_id, -- 商品编号
goods_status, -- 商品状态
createtime, -- 商品创建时间
modifytime, -- 商品修改时间
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2`
where
dt = '2019-12-20';
当天最新的数据 |
---|
增量导入2019年12月21日数据
1 MySQL数据库导入12月21日数据(6条数据)
UPDATE `demo`.`t_product_2` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
INSERT INTO `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) VALUES
('005', '待审核', '2019-12-21', '2019-12-21'),
('006', '待审核', '2019-12-21', '2019-12-21');
2 使用Kettle开发增量同步MySQL数据到Hive ods层表
Hive创建分区
-- 创建分区
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
表输入读取MySQL数据
SELECT
*
FROM t_product_2
where modifytime = '${dt}'
ods中数据 |
---|
3 编写SQL处理dw层历史数据,重新计算之前的dw_end_date
-- 重新计算dw层拉链表中的失效时间
select
t1.goods_id, -- 商品编号
t1.goods_status, -- 商品状态
t1.createtime, -- 商品创建时间
t1.modifytime, -- 商品修改时间
t1.dw_start_date, -- 生效日期(生效日期无需重新计算)
case when (t2.goods_id is not null and t1.dw_end_date > '2019-12-21')
then '2019-12-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新计算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2019-12-21') t2
on t1.goods_id = t2.goods_id
6 合并当天最新的数据和历史数据到
insert overwrite table `demo`.`dw_product_2`
select
t1.goods_id, -- 商品编号
t1.goods_status, -- 商品状态
t1.createtime, -- 商品创建时间
t1.modifytime, -- 商品修改时间
t1.dw_start_date, -- 生效日期(生效日期无需重新计算)
case when (t2.goods_id is not null and t1.dw_end_date > '2019-12-21')
then '2019-12-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新计算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2019-12-21') t2
on t1.goods_id = t2.goods_id
union all
select
goods_id, -- 商品编号
goods_status, -- 商品状态
createtime, -- 商品创建时间
modifytime, -- 商品修改时间
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2` where dt='2019-12-21'
order by dw_start_date, goods_id;
最终数据 |
---|
数据采集- 商品维度数据加载
使用拉链表解决商品表SCD问题
dw层建表
-- dw层建表
DROP TABLE IF EXISTS `itcast_dw`.`dim_goods`;
CREATE TABLE `itcast_dw`.`dim_goods`(
goodsId bigint,
goodsSn string,
productNo string,
goodsName string,
goodsImg string,
shopId bigint,
goodsType bigint,
marketPrice double,
shopPrice double,
warnStock bigint,
goodsStock bigint,
goodsUnit string,
goodsTips string,
isSale bigint,
isBest bigint,
isHot bigint,
isNew bigint,
isRecom bigint,
goodsCatIdPath string,
goodsCatId bigint,
shopCatId1 bigint,
shopCatId2 bigint,
brandId bigint,
goodsDesc string,
goodsStatus bigint,
saleNum bigint,
saleTime string,
visitNum bigint,
appraiseNum bigint,
isSpec bigint,
gallery string,
goodsSeoKeywords string,
illegalRemarks string,
dataFlag bigint,
createTime string,
isFreeShipping bigint,
goodsSerachKeywords string,
modifyTime string,
dw_start_date string,
dw_end_date string
)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
具体步骤
拉链表设计一共分为以下几个步骤:
1 第一次全量导入
- 所有的ODS数据全部导入到拉链历史记录表中
2 增量导入(某天,举例:2019-09-10)
- 增量导入某天的数据到ODS分区
- 合并历史数据
- 通过连接查询方式更新
全量导入
- 将所有 2019年09月09日以前创建的商品以及修改的数据全部导入到拉链历史记录表中
操作步骤:
1 使用Kettle将20190909以前的数据抽取到ods
SELECT *
FROM itcast_ods.itcast_goods
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') <= '20190909';
2 使用spark sql将全量数据导入到dw层维度表
set spark.sql.shuffle.partitions=1; --shuffle时的分区数,默认是200个
-- 使用spark sql将全量数据导入到dw层维度表
insert overwrite table `itcast_dw`.`dim_goods`
select
goodsId,
goodsSn,
productNo,
goodsName,
goodsImg,
shopId,
goodsType,
marketPrice,
shopPrice,
warnStock,
goodsStock,
goodsUnit,
goodsTips,
isSale,
isBest,
isHot,
isNew,
isRecom,
goodsCatIdPath,
goodsCatId,
shopCatId1,
shopCatId2,
brandId,
goodsDesc,
goodsStatus,
saleNum,
saleTime,
visitNum,
appraiseNum,
isSpec,
gallery,
goodsSeoKeywords,
illegalRemarks,
dataFlag,
createTime,
isFreeShipping,
goodsSerachKeywords,
modifyTime,
case when modifyTime is not null
then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')
end as dw_start_date,
'9999-12-31' as dw_end_date
from
`itcast_ods`.`itcast_goods` t
where dt='20190909';
增量导入
- 将2019年09月10日创建的 修改的数据全部导入到历史拉链表中
操作步骤:
1 使用Kettle将20190910创建的 或者修改的数据抽取到ods
SELECT *
FROM itcast_goods
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${dt}';
2 编写spark-sql更新历史数据
-- 更新历史数据
select
dw.goodsId,
dw.goodsSn,
dw.productNo,
dw.goodsName,
dw.goodsImg,
dw.shopId,
dw.goodsType,
dw.marketPrice,
dw.shopPrice,
dw.warnStock,
dw.goodsStock,
dw.goodsUnit,
dw.goodsTips,
dw.isSale,
dw.isBest,
dw.isHot,
dw.isNew,
dw.isRecom,
dw.goodsCatIdPath,
dw.goodsCatId,
dw.shopCatId1,
dw.shopCatId2,
dw.brandId,
dw.goodsDesc,
dw.goodsStatus,
dw.saleNum,
dw.saleTime,
dw.visitNum,
dw.appraiseNum,
dw.isSpec,
dw.gallery,
dw.goodsSeoKeywords,
dw.illegalRemarks,
dw.dataFlag,
dw.createTime,
dw.isFreeShipping,
dw.goodsSerachKeywords,
dw.modifyTime,
dw.dw_start_date,
case when dw.dw_end_date = '9999-12-31' and ods.goodsId is not null
then '2019-09-09'
else dw.dw_end_date
end as dw_end_date
from
`itcast_dw`.`dim_goods` dw
left join
(select * from `itcast_ods`.`itcast_goods` where dt='20190910') ods
on dw.goodsId = ods.goodsId ;
3 编写spark-sql获取当日数据
-- 今日数据
select
goodsId,
goodsSn,
productNo,
goodsName,
goodsImg,
shopId,
goodsType,
marketPrice,
shopPrice,
warnStock,
goodsStock,
goodsUnit,
goodsTips,
isSale,
isBest,
isHot,
isNew,
isRecom,
goodsCatIdPath,
goodsCatId,
shopCatId1,
shopCatId2,
brandId,
goodsDesc,
goodsStatus,
saleNum,
saleTime,
visitNum,
appraiseNum,
isSpec,
gallery,
goodsSeoKeywords,
illegalRemarks,
dataFlag,
createTime,
isFreeShipping,
goodsSerachKeywords,
modifyTime,
case when modifyTime is not null
then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')
end as dw_start_date,
'9999-12-31' as dw_end_date
from
`itcast_ods`.`itcast_goods`
where dt = '20190910';
4 将历史数据 当日数据合并加载到临时表
-- 将历史数据 当日数据合并加载到临时表
drop table if exists `itcast_dw`.`tmp_dim_goods_history`;
create table `itcast_dw`.`tmp_dim_goods_history`
as
select
dw.goodsId,
dw.goodsSn,
dw.productNo,
dw.goodsName,
dw.goodsImg,
dw.shopId,
dw.goodsType,
dw.marketPrice,
dw.shopPrice,
dw.warnStock,
dw.goodsStock,
dw.goodsUnit,
dw.goodsTips,
dw.isSale,
dw.isBest,
dw.isHot,
dw.isNew,
dw.isRecom,
dw.goodsCatIdPath,
dw.goodsCatId,
dw.shopCatId1,
dw.shopCatId2,
dw.brandId,
dw.goodsDesc,
dw.goodsStatus,
dw.saleNum,
dw.saleTime,
dw.visitNum,
dw.appraiseNum,
dw.isSpec,
dw.gallery,
dw.goodsSeoKeywords,
dw.illegalRemarks,
dw.dataFlag,
dw.createTime,
dw.isFreeShipping,
dw.goodsSerachKeywords,
dw.modifyTime,
dw.dw_start_date,
case when dw.dw_end_date >= '9999-12-31' and ods.goodsId is not null
then '2019-09-09'
else dw.dw_end_date
end as dw_end_date
from
`itcast_dw`.`dim_goods` dw
left join
(select * from `itcast_ods`.`itcast_goods` where dt='20190910') ods
on dw.goodsId = ods.goodsId
union all
select
goodsId,
goodsSn,
productNo,
goodsName,
goodsImg,
shopId,
goodsType,
marketPrice,
shopPrice,
warnStock,
goodsStock,
goodsUnit,
goodsTips,
isSale,
isBest,
isHot,
isNew,
isRecom,
goodsCatIdPath,
goodsCatId,
shopCatId1,
shopCatId2,
brandId,
goodsDesc,
goodsStatus,
saleNum,
saleTime,
visitNum,
appraiseNum,
isSpec,
gallery,
goodsSeoKeywords,
illegalRemarks,
dataFlag,
createTime,
isFreeShipping,
goodsSerachKeywords,
modifyTime,
case when modifyTime is not null
then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')
end as dw_start_date,
'9999-12-31' as dw_end_date
from
`itcast_ods`.`itcast_goods`
where dt = '20190910';
5 将历史数据 当日数据导入到历史拉链表
-- 将历史数据 当日数据导入到历史拉链表
insert overwrite table `itcast_dw`.`dim_goods`
select * from `itcast_dw`.`tmp_dim_goods_history`;
-- 获取2019-09-09日的商品数据
select * from `itcast_dw`.`dim_goods` where dw_start_date <= '2019-09-09' and dw_end_date >= '2019-09-09' limit 10;
--查看对应商品id的历史拉链数据
select * from `itcast_dw`.`dim_goods` where goodsId = 100134;
数据采集-周期性事实表
因为订单表和订单退款表都有状态变化的特点,所以他们作为周期性事实表在进行同步操作也就是采集数据到数仓中时需要我们能记录下订单的状态变化。因次依然使用拉链表来解决这类周期性事实表的同步需求。订单明细表并不会随着时间而变化,所以不需要使用拉链表进行同步。
订单表、订单退款表 拉链表具体实现步骤:
创建dw层订单拉链表、订单退款拉链表
-- 创建dw层订单事实表--带有分区字段
DROP TABLE IF EXISTS itcast_dw.fact_orders;
create table itcast_dw.fact_orders(
orderId bigint,
orderNo string,
shopId bigint,
userId bigint,
orderStatus bigint,
goodsMoney double,
deliverType bigint,
deliverMoney double,
totalMoney double,
realTotalMoney double,
payType bigint,
isPay bigint,
areaId bigint,
userAddressId bigint,
areaIdPath string,
userName string,
userAddress string,
userPhone string,
orderScore bigint,
isInvoice bigint,
invoiceClient string,
orderRemarks string,
orderSrc bigint,
needPay double,
payRand bigint,
orderType bigint,
isRefund bigint,
isAppraise bigint,
cancelReason bigint,
rejectReason bigint,
rejectOtherReason string,
isClosed bigint,
goodsSearchKeys string,
orderunique string,
receiveTime string,
deliveryTime string,
tradeNo string,
dataFlag bigint,
createTime string,
settlementId bigint,
commissionFee double,
scoreMoney double,
useScore bigint,
orderCode string,
extraJson string,
orderCodeTargetId bigint,
noticeDeliver bigint,
invoiceJson string,
lockCashMoney double,
payTime string,
isBatch bigint,
totalPayFee bigint,
modifiedTime string,
dw_start_date string,
dw_end_date string
)
partitioned by (dt string) --按照天分区
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
--临时订单表
DROP TABLE IF EXISTS itcast_dw.tmp_fact_orders;
create table itcast_dw.tmp_fact_orders(
orderId bigint,
orderNo string,
shopId bigint,
userId bigint,
orderStatus bigint,
goodsMoney double,
deliverType bigint,
deliverMoney double,
totalMoney double,
realTotalMoney double,
payType bigint,
isPay bigint,
areaId bigint,
userAddressId bigint,
areaIdPath string,
userName string,
userAddress string,
userPhone string,
orderScore bigint,
isInvoice bigint,
invoiceClient string,
orderRemarks string,
orderSrc bigint,
needPay double,
payRand bigint,
orderType bigint,
isRefund bigint,
isAppraise bigint,
cancelReason bigint,
rejectReason bigint,
rejectOtherReason string,
isClosed bigint,
goodsSearchKeys string,
orderunique string,
receiveTime string,
deliveryTime string,
tradeNo string,
dataFlag bigint,
createTime string,
settlementId bigint,
commissionFee double,
scoreMoney double,
useScore bigint,
orderCode string,
extraJson string,
orderCodeTargetId bigint,
noticeDeliver bigint,
invoiceJson string,
lockCashMoney double,
payTime string,
isBatch bigint,
totalPayFee bigint,
modifiedTime string,
dw_start_date string,
dw_end_date string
)
partitioned by (dt string)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
--创建订单退款表--带有分区字段
drop table if exists `itcast_dw`.`fact_order_refunds`;
create table `itcast_dw`.`fact_order_refunds`(
id bigint,
orderId bigint,
goodsId bigint,
refundTo bigint,
refundReson bigint,
refundOtherReson string,
backMoney double,
refundTradeNo string,
refundRemark string,
refundTime string,
shopRejectReason string,
refundStatus bigint,
createTime string,
modifiedTime string,
dw_start_date string,
dw_end_date string
)
partitioned by (dt string) --按照天分区
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
--临时表
drop table if exists `itcast_dw`.`tmp_fact_order_refunds`;
create table `itcast_dw`.`tmp_fact_order_refunds`(
id bigint,
orderId bigint,
goodsId bigint,
refundTo bigint,
refundReson bigint,
refundOtherReson string,
backMoney double,
refundTradeNo string,
refundRemark string,
refundTime string,
shopRejectReason string,
refundStatus bigint,
createTime string,
modifiedTime string,
dw_start_date string,
dw_end_date string
)
partitioned by (dt string)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
第一次全量数据导入拉链表
我们开启hive的动态分区,并根据数据的createtime字段进行分区划分,同一天创建的订单放在同一分区!!
#开启动态分区,默认是false
#开启允许所有分区都是动态的,否则必须要有静态分区才能使用
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
订单表数据:ods层导入dw层
insert overwrite table itcast_dw.fact_orders
select
orderId ,
orderNo ,
shopId ,
userId ,
orderStatus ,
goodsMoney ,
deliverType ,
deliverMoney ,
totalMoney ,
realTotalMoney ,
payType ,
isPay ,
areaId ,
userAddressId ,
areaIdPath ,
userName ,
userAddress ,
userPhone ,
orderScore ,
isInvoice ,
invoiceClient ,
orderRemarks ,
orderSrc ,
needPay ,
payRand ,
orderType ,
isRefund ,
isAppraise ,
cancelReason ,
rejectReason ,
rejectOtherReason ,
isClosed ,
goodsSearchKeys ,
orderunique ,
receiveTime ,
deliveryTime ,
tradeNo ,
dataFlag ,
createTime ,
settlementId ,
commissionFee ,
scoreMoney ,
useScore ,
orderCode ,
extraJson ,
orderCodeTargetId ,
noticeDeliver ,
invoiceJson ,
lockCashMoney ,
payTime ,
isBatch ,
totalPayFee ,
modifiedTime ,
--增加开始时间
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
--增加结束时间
'9999-12-31' as dw_end_date,
--指定动态分区使用的字段,动态分区的用法:就是查询字段的最后一个字段hive表进行解析然后存入指定分区
--此次数据分区按照订单的创建时间
date_format(createtime,'yyyyMMdd')
from itcast_ods.itcast_orders where dt="20190909";
订单退款表:ods层导入dw层
insert overwrite table itcast_dw.fact_order_refunds
select
id,
orderId,
goodsId,
refundTo,
refundReson,
refundOtherReson,
backMoney,
refundTradeNo,
refundRemark,
refundTime,
shopRejectReason,
refundStatus,
createTime,
modifiedTime,
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
'9999-12-31' as dw_end_date,
--此次数据分区按照订单退款的创建时间
date_format(createTime,'yyyyMMdd')
from itcast_ods.itcast_order_refunds where dt="20190909";
增量数据导入拉链表
kettle抽取增量数据导入ods层
抽取20190910这一天的数据,查询条件为modifiedTime等于20190910这天的订单数据和订单退款数据!! |
---|
表输入组件sql语句:
SELECT *
FROM itcast_orders
WHERE DATE_FORMAT(modifiedTime, '%Y%m%d') = '${dt}';
字段选择组件 |
---|
parquet output组件 |
ods层数据合并到dw层拉链表中
insert overwrite table itcast_dw.tmp_fact_orders
select
dw.orderId ,
dw.orderNo ,
dw.shopId ,
dw.userId ,
dw.orderStatus ,
dw.goodsMoney ,
dw.deliverType ,
dw.deliverMoney ,
dw.totalMoney ,
dw.realTotalMoney ,
dw.payType ,
dw.isPay ,
dw.areaId ,
dw.userAddressId ,
dw.areaIdPath ,
dw.userName ,
dw.userAddress ,
dw.userPhone ,
dw.orderScore ,
dw.isInvoice ,
dw.invoiceClient ,
dw.orderRemarks ,
dw.orderSrc ,
dw.needPay ,
dw.payRand ,
dw.orderType ,
dw.isRefund ,
dw.isAppraise ,
dw.cancelReason ,
dw.rejectReason ,
dw.rejectOtherReason ,
dw.isClosed ,
dw.goodsSearchKeys ,
dw.orderunique ,
dw.receiveTime ,
dw.deliveryTime ,
dw.tradeNo ,
dw.dataFlag ,
dw.createTime ,
dw.settlementId ,
dw.commissionFee ,
dw.scoreMoney ,
dw.useScore ,
dw.orderCode ,
dw.extraJson ,
dw.orderCodeTargetId ,
dw.noticeDeliver ,
dw.invoiceJson ,
dw.lockCashMoney ,
dw.payTime ,
dw.isBatch ,
dw.totalPayFee ,
dw.modifiedTime ,
dw.dw_start_date,
--修改end_date
case when ods.orderid is not null and dw.dw_end_date ='9999-12-31'
then '2019-09-09'
else dw.dw_end_date
end as dw_end_date,
--动态分区需要的字段
dw.dt
from
itcast_dw.fact_orders dw
left join
(select * from itcast_ods.itcast_orders where dt ='20190910') ods
on dw.orderid=ods.orderid
union all
--今天新增数据的插入动作
select
orderId ,
orderNo ,
shopId ,
userId ,
orderStatus ,
goodsMoney ,
deliverType ,
deliverMoney ,
totalMoney ,
realTotalMoney ,
payType ,
isPay ,
areaId ,
userAddressId ,
areaIdPath ,
userName ,
userAddress ,
userPhone ,
orderScore ,
isInvoice ,
invoiceClient ,
orderRemarks ,
orderSrc ,
needPay ,
payRand ,
orderType ,
isRefund ,
isAppraise ,
cancelReason ,
rejectReason ,
rejectOtherReason ,
isClosed ,
goodsSearchKeys ,
orderunique ,
receiveTime ,
deliveryTime ,
tradeNo ,
dataFlag ,
createTime ,
settlementId ,
commissionFee ,
scoreMoney ,
useScore ,
orderCode ,
extraJson ,
orderCodeTargetId ,
noticeDeliver ,
invoiceJson ,
lockCashMoney ,
payTime ,
isBatch ,
totalPayFee ,
modifiedTime ,
--增加开始时间
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
--增加结束时间
'9999-12-31' as dw_end_date,
--指定动态分区使用的字段,动态分区的用法:就是查询字段的最后一个字段hive表进行解析然后存入指定分区
--此次数据分区按照订单的创建时间
date_format(createtime,'yyyyMMdd')
from itcast_ods.itcast_orders where dt="20190910";
--从临时表再插入itcast_dw.fact_orders
insert overwrite table itcast_dw.fact_orders
select
* from
itcast_dw.tmp_fact_orders;
--验证数据查询拉链表数据
select * from itcast_dw.fact_orders limit 5;
--订单退款表增量数据与历史数据合并覆盖插入dw层临时拉链表中
insert overwrite table itcast_dw.tmp_fact_order_refunds
select
dw.id,
dw.orderId,
dw.goodsId,
dw.refundTo,
dw.refundReson,
dw.refundOtherReson,
dw.backMoney,
dw.refundTradeNo,
dw.refundRemark,
dw.refundTime,
dw.shopRejectReason,
dw.refundStatus,
dw.createTime,
dw.modifiedTime,
dw.dw_start_date,
case when ods.id is not null and dw.dw_end_date ='9999-12-31'
then '2019-09-09'
else dw.dw_end_date
end as dw_end_date,
dw.dt
from itcast_dw.fact_order_refunds dw
left join (select * from itcast_ods.itcast_order_refunds where dt="20190910") ods
on dw.id =ods.id
union all
select
id,
orderId,
goodsId,
refundTo,
refundReson,
refundOtherReson,
backMoney,
refundTradeNo,
refundRemark,
refundTime,
shopRejectReason,
refundStatus,
createTime,
modifiedTime,
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
'9999-12-31' as dw_end_date,
date_format(createTime,'yyyyMMdd')
from itcast_ods.itcast_order_refunds where dt="20190910";
--合并数据插入临时表
insert overwrite table itcast_dw.fact_order_refunds
select * from itcast_dw.tmp_fact_order_refunds;
--验证数据
select * from itcast_dw.fact_order_refunds limit 5;
拉链表分区意义
--合并11号数据
insert overwrite table itcast_dw.tmp_fact_orders
select
dw.orderId ,
dw.orderNo ,
dw.shopId ,
dw.userId ,
dw.orderStatus ,
dw.goodsMoney ,
dw.deliverType ,
dw.deliverMoney ,
dw.totalMoney ,
dw.realTotalMoney ,
dw.payType ,
dw.isPay ,
dw.areaId ,
dw.userAddressId ,
dw.areaIdPath ,
dw.userName ,
dw.userAddress ,
dw.userPhone ,
dw.orderScore ,
dw.isInvoice ,
dw.invoiceClient ,
dw.orderRemarks ,
dw.orderSrc ,
dw.needPay ,
dw.payRand ,
dw.orderType ,
dw.isRefund ,
dw.isAppraise ,
dw.cancelReason ,
dw.rejectReason ,
dw.rejectOtherReason ,
dw.isClosed ,
dw.goodsSearchKeys ,
dw.orderunique ,
dw.receiveTime ,
dw.deliveryTime ,
dw.tradeNo ,
dw.dataFlag ,
dw.createTime ,
dw.settlementId ,
dw.commissionFee ,
dw.scoreMoney ,
dw.useScore ,
dw.orderCode ,
dw.extraJson ,
dw.orderCodeTargetId ,
dw.noticeDeliver ,
dw.invoiceJson ,
dw.lockCashMoney ,
dw.payTime ,
dw.isBatch ,
dw.totalPayFee ,
dw.modifiedTime ,
dw.dw_start_date,
--修改end_date
case when ods.orderid is not null and dw.dw_end_date ='9999-12-31'
then '2019-09-10'
else dw.dw_end_date
end as dw_end_date,
--动态分区需要的字段
dw.dt
from
(select * from itcast_dw.fact_orders where dt > '20190801') dw left join
(select * from itcast_ods.itcast_orders where dt ='20190911') ods
on dw.orderid=ods.orderid
union all
--今天新增数据的插入动作
select
orderId ,
orderNo ,
shopId ,
userId ,
orderStatus ,
goodsMoney ,
deliverType ,
deliverMoney ,
totalMoney ,
realTotalMoney ,
payType ,
isPay ,
areaId ,
userAddressId ,
areaIdPath ,
userName ,
userAddress ,
userPhone ,
orderScore ,
isInvoice ,
invoiceClient ,
orderRemarks ,
orderSrc ,
needPay ,
payRand ,
orderType ,
isRefund ,
isAppraise ,
cancelReason ,
rejectReason ,
rejectOtherReason ,
isClosed ,
goodsSearchKeys ,
orderunique ,
receiveTime ,
deliveryTime ,
tradeNo ,
dataFlag ,
createTime ,
settlementId ,
commissionFee ,
scoreMoney ,
useScore ,
orderCode ,
extraJson ,
orderCodeTargetId ,
noticeDeliver ,
invoiceJson ,
lockCashMoney ,
payTime ,
isBatch ,
totalPayFee ,
modifiedTime ,
--增加开始时间
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
--增加结束时间
'9999-12-31' as dw_end_date,
--指定动态分区使用的字段,动态分区的用法:就是查询字段的最后一个字段hive表进行解析然后存入指定分区
--此次数据分区按照订单的创建时间
date_format(createtime,'yyyyMMdd')
from itcast_ods.itcast_orders where dt="20190911";
剩余表增量抽取
使用kettle完成剩余表的增量抽取到ods层 |
---|