本文为 Database Data Warehousing Guide 的第四部分。主要涉及数据仓库创建的ETL操作。
第12章 总览抽取、转换、和装载
事实上,还遗漏了传输环节,而且各个部分不是完全独立的。
工具OWB oracle warehouse builder。
第13章 抽取数据
数据仓库抽取的总览
数据仓库抽取即从原系统抽取数据以用于数据仓库环境下。
抽取过程的设计需要考虑下面两个问题:
选择什么抽取方法?
这将影响原系统、传输过程和数据仓库更新所需时间。以什么方式提供数据以备后续使用?
这将影响传输方法,以及对清洗、转换数据的需求。
抽取方法介绍(形式)
抽取方法的选择依赖于原系统和目标数据仓库系统的商业需求。通常而言,由于性能和系统负载的原因,不能在原系统中加入额外的逻辑来实现增量数据抽取。
抽取的方法分为两类:
全部抽取
一个全部抽取的例子是,一个单独表的导出文件或者一个远程SQL语句扫描完整的源表。
增量抽取
有时候,只有从某一预定的事件之后的数据是需要抽取的。为了identify确认这些delta change增量变化数据,必须可以识别所有的从特定事件之后的改变信息。这一信息可以通过源表中加入last-changed时间戳或者使用change table改变表机制实现。当然,使用后者意味着在原系统中加入了额外的抽取逻辑。
很多数据仓库在抽取环节不适用CDC技术。而是将原系统中的所有表抽取到数据仓库或者staging area临时区域,然后将这些表与之前抽取的内容进行对比来识别哪些改变数据。
Oracle 的CDC机制可以抽取和维护这些质量信息。
物理抽取方法(实际方法)
取决于所选取的逻辑抽取方法和源系统的能力和限制,这些抽取的数据可以以两种机制来抽取。数据可以从原系统在线抽取或者从一个离线的结构中抽取。
在线抽取
数据直接从原系统中抽取。抽取进程可以直接和原系统连接已访问源表,或者连接到按预配置方式存储数据(snapshot logs or change tables)的中间系统中。离线抽取
离线抽取中,数据不是直接从原系统中抽取,而是显式地暂存在系统之外。数据已经有了一个存在的结构(如redo logs, archive logs or transportable tablespaces)或者是有一个抽取例程创建。
下面这些结构可供考虑:
- Flat files--普通文件
- Dump files--Oracle转存文件
- Redo and archive logs--信息存储在一个特殊的额外的转存文件中
- Transportable tablespaces--可传输表空间,Oracle推荐在可行的情况下尽量使用这种方法,因为其性能好,可控制性高。
CDC
抽取的一个重要的方法是增量抽取,使用CDC改变数据捕获。
这节,将介绍几种自己实现的改变数据捕获机制。
-
时间戳
某些操作型系统中的表含有timestamp列。如果存在时间戳列,就可以方便地识别最新数据。下例中的查询可以方便地从order表中抽取今天的数据。SELECT * FROM orders WHERE TRUNC(CAST(order_date AS date),'dd') = TO_DATE(SYSDATE,'dd-mon-yyyy');
Partition分割
有些系统可能使用范围分割,使得源表根据日期键分割,这样也可以方便地识别改变数据。Trigger触发器
Oracle推荐使用synchronous Change Data Capture 这一基于触发器的改变数据捕获技术。
抽取例子
使用数据文件进行抽取
大多数数据库系统提供了从内部数据库格式导出或者卸载数据到普通文件的机制,和工具。
当源系统是Oracle数据库时,可以使用下面几种方法来将数据抽取到文件中:
- 使用SQL*Plus
使用select语句抽取有用数据,然后将结果重定向写入到文件中。
SET echo off SET pagesize 0 SPOOL country_city.log
SELECT distinct t1.country_name ||'|'|| t2.cust_city
FROM countries t1, customers t2 WHERE t1.country_id = t2.country_id
AND t1.country_name= 'United States of America';
SPOOL off
这种抽取结束可以被并行化,每个对话查询源数据的不同partition部分。被抽取的文件分开存储在独立文件中,可以通过SQLLoader来并行地载入到数据库中。
与SQLPlus不同的是,使用外部表external table data pump unload功能可以实现透明的并行能力。
使用OCI或者 ProC程序
OCI为Oracle调用接口,如ProC程序。使用Export工具
Export文件既包含数据也包含元数据;一个export文件可以包含一个对象的子集、许多数据库对象、甚至整个数据库schema模式;Export只能抽取数据库对象的子集,而不能抽取复杂查询语句的结果;Export的输出结果只能被Import工具处理。
Oracle提供原生的Export和Import工具来向前兼容,并提供data pump export/import infrastructure进行高性能、可扩展且并行化的抽取。-
使用External Tables外部表
除了使用Export工具你还可以使用外部表来抽取任何Select操作的结果。下面的例子演示了并行抽取一个连接操作的结果到四个文件中。CREATE DIRECTORY def_dir AS '/net/dlsun48/private/hbaer/WORK/FEATURES/et'; DROP TABLE extract_cust; CREATE TABLE extract_cust ORGANIZATION EXTERNAL (TYPE ORACLE_DATAPUMP DEFAULT DIRECTORY def_dir ACCESS PARAMETERS (NOBADFILE NOLOGFILE) LOCATION ('extract_cust1.exp', 'extract_cust2.exp', 'extract_cust3.exp', 'extract_cust4.exp')) PARALLEL 4 REJECT LIMIT UNLIMITED AS SELECT c.*, co.country_name, co.country_subregion, co.country_region FROM customers c, countries co where co.country_id=c.country_id;
通过分布式操作进行抽取
使用分布式查询技术,一个Oracle数据库可以直接查询位于不同原系统中的表,比如另一个数据库或者一个旧的使用Oracle gateway技术连接的系统。这种方法可以吧抽取和传输结合起来。比如使用一个Oracle Net连接和distributed-query,可以用一个SQL语句实现,将employee names 和 department names数据抽取并存储到数据仓库。
CREATE TABLE country_city AS SELECT distinct t1.country_name, t2.cust_city
FROM countries@source_db t1, customers@source_db t2
WHERE t1.country_id = t2.country_id
AND t1.country_name='United States of America';
这个语句创建了一个本地的数据集市country_city,并用countries和customers中的数据进行填充。
这种技术是进行少量数据转移的理想方式。然而,由于数据传输只使用了一个Oracle NET connection,所以其可扩展性受限。对于更大的数据量,基于文件的数据抽取和传输方式可扩展性好,更合适。
第14章 传输数据
数据传输总览
数据仓库环境下,最常见的数据转移需求在于:
- 从源系统传输到暂存数据库或到一个数据仓库数据库
- 从暂存数据库到数据仓库
- 从数据仓库到数据集市
数据传输时ETL过程中最简单的部分,甚至可以被集成到ETL过程中的其他部分,如上一章介绍到的分布式查询技术提供了一种抽取和传输数据的机制。
数据仓库中的传输机制介绍
使用Flat Files普通文件
传输数据最常用的方法是传输flat files普通文件,可以使用FTP或其它任何远程文件系统访问协议。由于源系统和目标系统往往使用的是不同的操作系统和数据库系统,使用flat files往往是在异构系统中交换数据的最简单的方法(不需要太多的转换)。而且,即使是在同构的系统之间传输数据,flat files也往往是最高效、最好管理的数据传输机制。
使用分布式操作
如上一章分布式查询方法的介绍,缺点是耗时,吃资源。
使用Transportable Tablespace可传输表空间
Oracle Transportable Tablespace是在两个Oracle数据库之间传输数据的最快的方法。之前讲到,最可扩展的数据传输机制是移动flat files普通文件,而这些技术需要先进行数据的export,传输之后还要进行import操作。而可传输表空间则直接省去了unload和reload步骤。
使用可传输表空间,Oracle数据文件(包含表数据、索引、几乎所有其它Oracle数据库对象)可以从一个系统传到另一个系统。另外,类似import和export,oracle Transportable Tablespace提供了在传输数据的同时传输元数据的机制。
可传输表空间最常见的应用时staging database --> a data warehouse,和data warehouse --> a data mart的数据传输。
可传输表空间的例子
假设你有一个包含销售数据的数据仓库以及几个每月更新的数据集市。并假设你正要从数据仓库向数据集市中移动数据。
-
step1 将要传输的数据放入自己的表空间
本月的数据必须放入单独的表空间以用于传输。本例中,你有一个表空间ts_temp_sales,其中包含了一份本月的数据。使用create table ... as select语句,本月的数据可以高效地拷贝到这个表空间中。CREATE TABLE temp_jan_sales NOLOGGING TABLESPACE ts_temp_sales AS SELECT * FROM sales WHERE time_id BETWEEN '31-DEC-1999' AND '01-FEB-2000';
然后,将ts_temp_sales表空间设置为只读(表空间只有在没有活动的事务的时候才能被传输,设置为只读强制实现这一要求)。
ALTER TABLESPACE ts_temp_sales READ ONLY;
在可传输表空间操作中,表空间中的所有对象都被传输。多个表可以在同一个表空间中被传输,同时这个表空间中还可能包含索引等其它数据库对象。
本例中,我们创建了一个临时的表空间,如果要传输的数据本来就在一个独立地表空间中,那么该表空间也是可以被传输的而不需要额外创建临时表空间。
-
step 2 导出元数据
我们将用到Export工具来导出描述可传输表空间中的对象的元数据。在本例场景下,导出操作是:EXP TRANSPORT_TABLESPACE=y TABLESPACES=ts_temp_sales FILE=jan_sales.dmp
这一操作生成了一个导出文件jan_sales.dmp,该文件很小,因为只包含了temp_jan_sales表的描述信息,如列名,列类型和其他所有目标Oracle数据库系统访问ts_temp_sales所需要的信息。
step 3 拷贝数据文件和包含元数据的Export文件到目标系统
使用任何普通文件传输机制,拷贝构成ts_temp_sales的数据文件和包含元数据的Export文件jan_sales.dmp到数据集市平台。拷贝完成后,如有需要,表空间ts_temp_sales可以重新被设置为READ WRITE模式。step 4 导入元数据
一旦文件被拷贝到数据集市中,就可以将元数据import到数据集市。
IMP TRANSPORT_TABLESPACE=y DATAFILES='/db/tempjan.f'
TABLESPACES=ts_temp_sales FILE=jan_sales.dmp
现在,你就可以将新数据合并到数据集市中的表里。你有两种方式将temp_sales_jan表中的数据插入到数据集市的sales表中:
方法一:
INSERT /*+ APPEND */ INTO sales SELECT * FROM temp_sales_jan;
方法二:
如果数据集市的sales表本来就是依据月份分割的,那么这个可传输表空间就可以直接成为数据集市中的一个永久部分,而temp_sales_jan 表将成为数据集市sales表的一个分割:
ALTER TABLE sales ADD PARTITION sales_00jan VALUES
LESS THAN (TO_DATE('01-feb-2000','dd-mon-yyyy'));
ALTER TABLE sales EXCHANGE PARTITION sales_00jan
WITH TABLE temp_sales_jan INCLUDING INDEXES WITH VALIDATION;
第15章 装载和转换
本章将帮助你创建和管理数据仓库。
数据加载和转换总览
数据的转换时ETL过程中最复杂的过程,覆盖简单的conversion数据变换,到复杂的scrubbing数据擦洗。许多数据转换工作,但不是所有,是在Oracle数据库中发生的。
本章介绍在Oracle数据库中实现可扩展、高效的数据转换。本章不覆盖所有转换技术,但是却介绍了最基本的方法(more scalability and less programming)以及如何选择这些方法。
转换流程
从结构的角度来分,你可以以2种方式(流程)来转换数据:
- Multistage Data Teansformation多暂存区数据转换
大多数数据仓库的转换逻辑由多个步骤组成。比如,在将新记录插入到sales表中是,可能存在多个独立地转换步骤来处理每个维度的键。
当使用Oracle数据库作为转换引擎是,一个常见的策略是用单独的SQL操作实现每个转换,并创建一个独立地、临时的暂存表。这种load-then-transfer的策略相当于设置了自然的checkpoint,方便监督和重启,缺点是需要额外的时间和空间。
- Pipelined Data Transformation管道化数据传输
将一系列的transform-then-load(此为大多数转换在数据库外进行的情形)或load-then-transform过程,变为transfer-while-loading。
下图演示了这一功能(下文主要介绍之)。
- Staging Area暂存区
对于Load加载而言,速度取决于原始数据从staging area写到目标表中的速度。强烈推荐使用尽量多的物理磁盘来暂存原始数据,以防止读取数据不会成为加载的瓶颈。
一个暂存数据的绝佳地点是Oracle Database File System(DBFS)
装载机制
使用SQL*Loader装载
传输数据最常用的技术时使用flat files普通文件。
SQLLoader用来将数据从普通文件导入到Oracle数据仓库中。在这一过程中,SQLLoader也可以用于进行基本的数据转换。
当使用direct-path SQL*Loader时,基本的数据操作如类型转换和NULL值处理等可以在数据装载时处理。大多数数据仓库出于性能的考虑使用直接路径加载(配合最小日志nologging设置)。
Oracle提供两种类型的插入语句:常规插入(conventional insert)和直接路径插入(direct-path insert),直接路径插入的目的是为了高效地加载大量的数据,它以牺牲部分功能为代价,因此受到很多的限制。直接加载的使用方法是:1)在SQL语句中加append提升;2)并行地执行SQL语句。只有insert inot ... select ... 语句、merge语句和使用OCI直接路径接口的应用程序才可以使用。
下面是装载sales表的控制文件(sh_sales.ctl):
LOAD DATA INFILE sh_sales.dat APPEND INTO TABLE sales
FIELDS TERMINATED BY "|"
(PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD)
可以通过下面的命令来执行载入:
$ sqlldr control=sh_sales.ctl direct=true
Username:
Password:
使用external tables外部表装载
另一种处理外部数据资源的方法是使用外部表。Oracle的外部表特性使你可以把外部当做虚拟表,并在无需先被载入到数据哭的情况下,直接并行地进行查询和连接操作。
外部表使得我们可以将装载和转换阶段管道化起来,将转换过程合并到加载过程中。
外部表和普通表的区别在于externally organized tables外部组织的表是只读的,不能对其进行DML操作(UPDATE/INSERT/DELETE)也不能对其索引。
你可以创建一个代表完整的交易事务数据的外部表sales_transactions_ext,并存在外部文件sh_sales.gz
中。产品部门对产品和时间分析很感兴趣。因此我们在sh模式下创建了一个cost事实表。建cost表的数据源和建立sales事实表的数据源相同,但是由于舍弃了多个维度,所以数据相对粗糙,需要进行聚集。
外部表对这种情绪提供了一种解决方案。不像SQL*Loader那样需要在应用聚集函数之前装载数据,外部表运行你将装载和转换结合到一条SQL语句中,也不用暂存数据。
下例演示基本的外部表操作。
CREATE TABLE sales_transactions_ext
(PROD_ID NUMBER, CUST_ID NUMBER,
TIME_ID DATE, CHANNEL_ID NUMBER,
PROMO_ID NUMBER, QUANTITY_SOLD NUMBER,
AMOUNT_SOLD NUMBER(10,2), UNIT_COST NUMBER(10,2),
UNIT_PRICE NUMBER(10,2))
ORGANIZATION external (TYPE oracle_loader
DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS
(RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII
PREPROCESSOR EXECDIR:'zcat'
BADFILE log_file_dir:'sh_sales.bad_xt'
LOGFILE log_file_dir:'sh_sales.log_xt'
FIELDS TERMINATED BY "|" LDRTRIM
( PROD_ID, CUST_ID,
TIME_ID DATE(10) "YYYY-MM-DD",
CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD,
UNIT_COST, UNIT_PRICE))
location ('sh_sales.gz')
)REJECT LIMIT UNLIMITED;
object?directories?对象目录必须已经存在,并指向包含sh_sales.gz和包含bad and log文件的目录。
然后,外部表就可以在数据库中使用了,
INSERT /*+ APPEND */ INTO COSTS
(TIME_ID, PROD_ID, UNIT_COST, UNIT_PRICE)
SELECT TIME_ID, PROD_ID, AVG(UNIT_COST), AVG(amount_sold/quantity_sold)
FROM sales_transactions_ext GROUP BY time_id, prod_id;
使用OCI和Direct-path API装载
在数据库外进行转换和计算,不需要flat file staging。
使用Export/import装载
参见Chapter 13 Extraction in Data Warehouses。
转换机制
使用SQL进行转换
-
CREATE TABLE ... AS SELECT And INSERT /+APPEND/ AS SELECT
或简称CTAS,结合nologging实现高性能。INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales SELECT product_id, customer_id, TRUNC(sales_date), 3, promotion_id, quantity, amount FROM sales_activity_direct;
使用update
只在少量数据需要改变的情况下合适。-
使用merge(upsert)
新数据需要插入或者更新。MERGE INTO products t USING products_delta s ON (t.prod_id=s.prod_id) WHEN MATCHED THEN UPDATE SET t.prod_list_price=s.prod_list_price, t.prod_min_price=s.prod_min_price WHEN NOT MATCHED THEN INSERT (prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_status, prod_list_price, prod_min_price) VALUES (s.prod_id, s.prod_name, s.prod_desc, s.prod_subcategory, s.prod_subcategory_desc, s.prod_category, s.prod_category_desc, s.prod_status, s.prod_list_price, s.prod_min_price);
-
使用multitable insert多表插入
经常,需要将外部数据资源依据逻辑属性进行分割,插入到不同的目标对象中,又或者需要从数据仓库扇出数据到多个目标对象。
使用INSERT ...?SELECT,新的语句可以被并行化,并使用direct load mechanism达到更高的性能。INSERT ALL WHEN promotion_id IN (SELECT promo_id FROM promotions) THEN INTO sales VALUES (product_id, customer_id, today, 3, promotion_id, quantity_per_day, amount_per_day) INTO costs VALUES (product_id, today, promotion_id, 3, product_cost, product_price) WHEN num_of_orders > 1 THEN INTO cum_sales_activity VALUES (today, product_id, customer_id, promotion_id, quantity_per_day, amount_per_day, num_of_orders) SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id, s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity) quantity_per_day, COUNT(*) num_of_orders, p.prod_min_price*0.8 AS product_cost, p.prod_list_price AS product_price FROM sales_activity_direct s, products p WHERE s.product_id = p.prod_id AND TRUNC(sales_date) = TRUNC(SYSDATE) GROUP BY TRUNC(sales_date), s.product_id, s.customer_id, s.promotion_id, p.prod_min_price*0.8, p.prod_list_price;
使用PL/SQL进行转换
PL/SQL过程可以打开多个游标来读取多个源表的数据。PL/SQL提供了过程化处理的机制,可以将一个转换封装到一个过程中,而表函数在其基础上将结果无缝的衔接起来。TABLE(function())
使用表函数进行转换
表函数提供了管道化并行执行转换的支持(implemented in PL/SQL, C, or Java.)。
表函数是一个产生a set of rows多行数据集,并可以多行数据集为输入的函数。
下图演示了一个进行聚集操作的表函数的例子。
另外,表函数可以在其内部的原子事务中扇出数据,如下图:
- 预先工作:创建数据库对象类型(as object) 、和对应游标包(as record)。
- 编写表函数tf,输入为游标,输出为table of product_t 。
- TABLE(tf())可以被当做表来处理(或者等价于select的结果)。
当使用强类型的游标作为参数时,可以进行并行化(ALTER TABLE products PARALLEL 4;)。另外表函数结合可传输表空间、游标、PIPELINED和PIPE ROW 可以实现增量的处理。
错误记录和处理机制
有两种主要的错误:
Business Rule?Violations业务规则违例
- 使用SQL语句过滤不和规则的数据。
- 识别并分离,如table function中将错误数据导出额外的数据表中。
Data Rule?Violations (Data Errors)数据规则违例
数据错误只能使用PL/SQL,还可以将数据错误记录到特殊的错误表中而运行DML操作继续执行。
-
SQL中处理数据错误
DECLARE errm number default 0; BEGIN FOR crec IN (SELECT product_id, customer_id, TRUNC(sales_date) sd, promotion_id, quantity, amount FROM sales_activity_direct) loop BEGIN INSERT INTO sales VALUES (crec.product_id, crec.customer_id, crec.sd, 3, crec.promotion_id, crec.quantity, crec.amount); exception WHEN others then errm := sqlerrm; INSERT INTO sales_activity_error VALUES (errm, crec.product_id, crec.customer_id, crec.sd, crec.promotion_id, crec.quantity, crec.amount); END; END loop; END; /
-
使用错误日志表来处理数据错误
使用Oracle DBMS_ERRLOG包来创建DML错误记录表sales_activity_errors。
用法:INSERT /*+ APPEND PARALLEL */ INTO sales SELECT product_id, customer_id, TRUNC(sales_date), 3, promotion_id, quantity, amount FROM sales_activity_direct LOG ERRORS INTO sales_activity_errors('load_20040802') REJECT LIMIT UNLIMITED
加载和转换场景
下面是一些典型的装载和转换任务的例子:
键查询场景
比如,假设销售交易数据被装载到零售数据仓库。虽然数据仓库sales表包含product_id列,但是,源表中抽取出来的数据对应的是Uniform Price Codes (UPC)列。
为了执行这一转换,一个关联product_id和Uniform Price Codes (UPC)的?查询表必须被建立,这个表就是我们的product维度表。这一转换可以用下面的CTAS语句实现:
CREATE TABLE temp_sales_step2 NOLOGGING PARALLEL AS SELECT sales_transaction_id,
product.product_id sales_product_id, sales_customer_id, sales_time_id,
sales_channel_id, sales_quantity_sold, sales_dollar_amount
FROM temp_sales_step1, product
WHERE temp_sales_step1.upc_code = product.upc_code;
商业规则违例场景
假设有些数据不合规范,没有UPC列。有以下解决方案:
- 使用CTAS 将违例数据写入额外的一张temp_sales_step1_invalid表总。
- 仍然使用CTAS,使用外连接,将那些upc_code为空的项设为null加入
- 使用insert/*+ APPEND PARALLEL */ first when into... else into .... select同时实现上述两步。
数据错误场景
使用insert/*+ APPEND PARALLEL */ first when into... else into .... select
最后加上错误日志记录。
LOG ERRORS INTO sales_step2_errors('load_20040804')
REJECT LIMIT UNLIMITED;
pivoting旋转场景
假设数据如下:
SELECT * FROM sales_input_table;
PRODUCT_ID CUSTOMER_ID WEEKLY_ST SALES_SUN SALES_MON SALES_TUE SALES_WED SALES_THU SALES_FRI SALES_SAT
---------- ----------- --------- ---------- ---------- ---------- -------------------- ---------- ----------
111 222 01-OCT-00 100 200 300 400 500 600 700
222 333 08-OCT-00 200 300 400 500 600 700 800
333 444 15-OCT-00 300 400 500 600 700 800 900
我们要把它转化为一个典型的关系型数据表sales中,如下:
SELECT prod_id, cust_id, time_id, amount_sold FROM sales;
PROD_ID CUST_ID TIME_ID AMOUNT_SOLD
---------- ---------- --------- -----------
111 222 01-OCT-00 100
111 222 02-OCT-00 200
111 222 03-OCT-00 300
111 222 04-OCT-00 400
111 222 05-OCT-00 500
111 222 06-OCT-00 600
111 222 07-OCT-00 700
222 333 08-OCT-00 200
222 333 09-OCT-00 300
222 333 10-OCT-00 400
222 333 11-OCT-00 500
222 333 12-OCT-00 600
222 333 13-OCT-00 700
222 333 14-OCT-00 800
333 444 15-OCT-00 300
333 444 16-OCT-00 400
333 444 17-OCT-00 500
333 444 18-OCT-00 600
333 444 19-OCT-00 700
333 444 20-OCT-00 800
333 444 21-OCT-00 900
操作的方法是使用insert all into... into... select,代码如下:
INSERT ALL INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date, sales_sun)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+1, sales_mon)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+2, sales_tue)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+3, sales_wed)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+4, sales_thu)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+5, sales_fri)
INTO sales (prod_id, cust_id, time_id, amount_sold)
VALUES (product_id, customer_id, weekly_start_date+6, sales_sat)
SELECT product_id, customer_id, weekly_start_date, sales_sun,
sales_mon, sales_tue, sales_wed, sales_thu, sales_fri, sales_sat
FROM sales_input_table;