一、项目介绍
Adventure
Works Cycles是一家生产和销售自行车及其附带产品的大型跨国制造公司,业务遍布全球。亚太市场则是其重点发展地区,其中中国市场更是核心区域。
这家公司主要有下面四个产品线:
● Adventure Works Cycles生产的自行车;
● 自行车部件,例如车轮,踏板或制动组件;
● 从供应商处购买的自行车服装,用于转售给Adventure Works Cycles的客户;
● 从供应商处购买的自行车配件,用于转售给Adventure Works Cycles的客户
二、分析目的
随着公司业务规模逐渐扩大的情况,公司需要增强数据化管理,满足业务同事实现自主分析的需求,从而实现对市场的快速判断。因此需要于数据部分进行沟通分析业务指标需求,构建可视化看板,并自动更新
1、通过Python对数据进行加工,从整体、地域、时间等角度,对销量、销售额、客单价等指标进行分析,搭建可视化看板。
2、通过linux服务器部署代码,实现数据的每日自动更新。
三、项目过程
● 数据观察
● 指标搭建
● 代码加工
● 数据自动更新
● 搭建可视化看板
1、数据观察
MySQL数据库中包含多张信息表,但我们只需要用到以下三张表:
(1) ods_sales_orders:订单明细表(2) ods_customers:用户信息表
(3) dim_date_df:日期维度表
2、指标搭建
从当日、昨日、当月、当季、当年日期维度角度,对销售金额、销量、同比等数据进行分析;
3、代码加工
(1) dw_order_by_day:每日聚合表
从MySQL数据库销售订单表,按照日期将每日信息进行聚合重组,最后计算销售额、订单数、每日环比等指标。部分Python代码如下:
# 按日聚合订单表
def sum_amount_order(adventure_conn_read):
"""sum_amount_order:销量订单聚合表
具体:读取ods_sales_orders(订单明细表),根据create_date聚合,求总销量/订单量/客单价"""
try:
# 读取ods_sales_orders表数据
sum_amount_order = pd.read_sql_query("select * from ods_sales_orders ",
con=adventure_conn_read)
# 聚合。根据create_date分组求总和单价 及 客户数量
sum_amount_order = sum_amount_order.groupby(by='create_date').agg(
{'unit_price': sum, 'customer_key': pd.Series.nunique}).reset_index()
# 修改列名
sum_amount_order.rename(columns={'unit_price': 'sum_amount',
'customer_key': 'sum_order'},
inplace=True)
# 计算客单价
sum_amount_order['amount_div_order'] = \
sum_amount_order['sum_amount'] / sum_amount_order['sum_order']
# 修改列名
sum_amount_order.rename(columns = {'customer_key': 'order_counts',
"unit_price": "sum_amount"}, inplace=True)
return sum_amount_order
# 如果错误,则填写错误日志
except Exception as e:
logger.info("sum_amount_order异常,报错信息:{}".format(e))
# 生成目标销售额和销量
def add_order_goal(sum_amount_order):
"""add_order_goal:生成目标金额及目标销量
具体:利用空列表及循环生成对应随机值,与销量订单聚合表合并形成sum_amount_order_goal(销量订单聚合目标表)"""
try:
# 定义两个列表,一个为每日销售额目标、一个为每日销量目标
sum_amount_goal_list = []
sum_order_goal_list = []
# 获取sum_amount_order中的create_date
create_date_list = list(sum_amount_order['create_date'])
for i in create_date_list:
# 生成一个在[0.85,1.1]随机数
a = random.uniform(0.85, 1.1)
b = random.uniform(0.85, 1.1)
# 生成每日目标销售额和目标销量
amount_goal = list(sum_amount_order[sum_amount_order['create_date'] == i]
['sum_amount'])[0] * a # 对应日期下生成总金额(sum_amount)*a的列
order_goal = list(sum_amount_order[sum_amount_order['create_date'] == i]
['sum_order'])[0] * b # 对应日期下生成总订单数(sum_order)*b的列
sum_amount_goal_list.append(amount_goal) # 将生成的目标值加入空列表
sum_order_goal_list.append(order_goal)
# 合并日聚合订单表、目标销售额表和目标销量表,生成销售聚合表
sum_amount_order_goal = pd.concat([sum_amount_order, pd.DataFrame(
{'sum_amount_goal': sum_amount_goal_list, 'sum_order_goal':
sum_order_goal_list})], axis=1)
# 计算完成率
sum_amount_order_goal["compliance_rate"] = sum_amount_order_goal["sum_amount"] / sum_amount_order_goal[
"sum_amount_goal"]
return sum_amount_order_goal
except Exception as e:
logger.info("add_order_goal异常,报错信息:{}".format(e))
# 读取日期维度表
def date_data(adventure_conn_tosql):
"""读取dim_date_df日期维度表"""
try:
date_sql = """
select create_date,
is_current_year,
is_last_year,
is_yesterday,
is_today,
is_21_day,
is_current_month,
is_current_quarter
from dim_date_df"""
date_info = pd.read_sql_query(date_sql, con=adventure_conn_tosql)
return date_info
except Exception as e:
logger.info("date_data异常,报错信息:{}".format(e))
# 合并销售聚合目标表和日期维度表
def merge_data(sum_amount_order_goal, date_info):
"""参数解释:sum_amount_order_goal销量订单聚合目标表,
date_info日期维度表(来自date_data函数)
输出:amount_order_by_day销量订单聚合目标及日期维度表
"""
try:
sum_amount_order_goal['create_date'] = sum_amount_order_goal['create_date']. \
apply(lambda x: x.strftime('%Y-%m-%d')) # 转化create_date格式为标准日期格式
amount_order_by_day = pd.merge(sum_amount_order_goal, date_info,
on='create_date', how='inner') # 通过主键create_date连接日期维度
return amount_order_by_day
except Exception as e:
logger.info("merge_data异常,报错信息:{}".format(e))
# 新表储存到数据库
def save_to_mysql(amount_order_by_day, adventure_conn_tosql):
"""将amount_order_by_day数据追加到数据库dw_order_by_day(每日环比表)当中"""
try:
amount_order_by_day['amount_diff'] = amount_order_by_day['sum_amount'].pct_change().fillna(0) # pct_change()表示当前元素与先前元素的相差百分比,默认竖向,例:前面元素x,当前元素y,公式 result = (y-x)/x
amount_order_by_day.to_sql('dw_order_by_day_{}'.format(MY_NAME), con=adventure_conn_tosql,
if_exists='replace', index=False) # 追加数据至dw_order_by_day
except Exception as e:
logger.info("save_to_mysql异常,报错信息:{}".format(e))
(2) dw_amount_diff:数据同比表
读取dw_order_by_day数据,按照今天、昨天、当月、当季度、今年、去年等日期维度进行聚合重组,再分别与去年的数据进行比较。部分python代码如下:
def diff(stage, indictor):
"""stage:日期维度的判断,如:is_today 内有[0,1]
indictor:需取值字段,如:sum_amount(总金额),sum_order(总订单量)
输出:当前时间维度下总和,去年同期总和"""
try:
# 求当前日期维度stage下的indictor总和
current_stage_indictor = dw_order_by_day[dw_order_by_day
[stage] == 1][indictor].sum()
# 取出当前日期维度下的前年对应日期列表
before_stage_list = list(dw_order_by_day[dw_order_by_day[stage] == 1]
['create_date'] - datetime.timedelta(days=366))
# 求当前日期维度下的前一年对应indictor总和
before_stage_indictor = dw_order_by_day[dw_order_by_day['create_date']
.isin(before_stage_list)][indictor].sum()
return current_stage_indictor, before_stage_indictor
except Exception as e:
logger.info("diff异常,报错信息:{}".format(e))
def delete_table():
try:
pd.read_sql_query("Truncate table dw_amount_diff_{}".format(STUNDENT_NAME), con=adventure_conn_tosql)
except: # 因为删除插入更新操作没有返回值,程序会抛出ResourceClosedError,并终止程序。使用try捕捉此异常。
print('继续')
logger.info("继续运行")
if __name__ == "__main__":
'''目的:生成dw_amount_diff当日维度表(按当天/昨天/当月/当季/当年的同比)'''
"""各阶段的金额"""
today_amount, before_year_today_amount = diff('is_today', 'sum_amount')
yesterday_amount, before_year_yesterday_amount = diff('is_yesterday', 'sum_amount')
month_amount, before_year_month_amount = diff('is_current_month', 'sum_amount')
quarter_amount, before_year_quarter_amount = diff('is_current_quarter', 'sum_amount')
year_amount, before_year_year_amount = diff('is_current_year', 'sum_amount')
"""各阶段的订单数"""
today_order, before_year_today_order = diff('is_today', 'sum_order')
yesterday_order, before_year_yesterday_order = diff('is_yesterday', 'sum_order')
month_order, before_year_month_order = diff('is_current_month', 'sum_order')
quarter_order, before_year_quarter_order = diff('is_current_quarter', 'sum_order')
year_order, before_year_year_order = diff('is_current_year', 'sum_order')
'''同比增长或同比下降(均与去年对比):总金额/订单量/客单价,当日/昨日/当月/当季/当年/'''
try:
amount_dic = {'today_diff': [today_amount / before_year_today_amount - 1,
today_order / before_year_today_order - 1,
(today_amount / today_order) / (before_year_today_amount /
before_year_today_order) - 1],
'yesterday_diff': [yesterday_amount / before_year_yesterday_amount - 1,
yesterday_order / before_year_yesterday_order - 1,
(yesterday_amount / yesterday_order) / (before_year_yesterday_amount /
before_year_yesterday_order) - 1],
'month_diff': [month_amount / before_year_month_amount - 1,
month_order / before_year_month_order - 1,
(month_amount / month_order) / (before_year_month_amount /
before_year_month_order) - 1],
'quarter_diff': [quarter_amount / before_year_quarter_amount - 1,
quarter_order / before_year_quarter_order - 1,
(quarter_amount / quarter_order) / (before_year_quarter_amount /
before_year_quarter_order) - 1],
'year_diff': [year_amount / before_year_year_amount - 1,
year_order / before_year_year_order - 1,
(year_amount / year_order) / (before_year_year_amount /
before_year_year_order) - 1],
'flag': ['amount', 'order', 'avg']} # 做符号简称,横向提取数据方便
amount_diff = pd.DataFrame(amount_dic)
amount_diff["create_date"] = today_date.strftime("%Y-%m-%d")
logger.info("""准备储存的数据为:""")
logger.info(amount_diff.head())
logger.info("""删除数据""")
delete_table()
logger.info("""数据存储""")
"""每次存储的时候,使用replace方式,便可替换原有数据"""
amount_diff.to_sql('dw_amount_diff_{}'.format(MY_NAME), con=adventure_conn_tosql,
if_exists='replace', index=False) # 存储为当日维度表
logger.info('成功生成dw_amount_diff当日维度表')
except ZeroDivisionError as e:
logger.info("请检查dw_order_by_day表是否有数据,错误信息:{}".format(e))
(3) dw_orders_info:销售信息表
从MySQL数据源读取订单明细表、客户信息表、日期维度表,将订单明细表与客户信息表通过"customer_key"字段合并,再与日期维度表进行合并。以下是部分python脚本代码:
# 数据表
def read_date(adventure_conn_read):
try:
# 订单明细表
sql_1 = "SELECT * from ods_sales_orders"
ods_sales_orders = pd.read_sql(sql_1,adventure_conn_read)
# 客户信息表
sql_2 = "SELECT customer_key,gender,chinese_territory,chinese_province,chinese_city from ods_customer"
ods_customer = pd.read_sql(sql_2,adventure_conn_read).drop_duplicates("customer_key")
# 日期维度表
sql_3 = "SELECT create_date, is_today, is_yesterday, is_21_day, is_current_month, is_current_quarter, is_current_year, is_last_year from dim_date_df"
dim_date_df = pd.read_sql(sql_3,adventure_conn_read)
return ods_sales_orders, ods_customer, dim_date_df
except Exception as e:
logger.info("read_date报错信息:{}".format(e))
# 合并订单明细表和客户信息表
def merge1(ods_sales_orders, ods_customer):
try:
ods_sales_orders['customer_key']=ods_sales_orders['customer_key'].apply(int)
ods_customer['customer_key'] = ods_customer['customer_key'].apply(int)
df1 = pd.merge(ods_sales_orders, ods_customer, on = "customer_key")
return df1
except Exception as e:
logger.info("merge1报错信息:{}".format(e))
# 合并df1和日期维度表
def merge2(df1, dim_date_df):
try:
df1["create_date"] = pd.to_datetime(df1["create_date"])
dim_date_df["create_date"] = pd.to_datetime(dim_date_df["create_date"])
dw_orders_info = pd.merge(df1, dim_date_df, on = "create_date")
# dw_orders_info["create_date"] = dw_orders_info["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
return dw_orders_info
except Exception as e:
logger.info("merge2报错信息:{}".format(e))
# 保存至sql
def save_tosql(dw_orders_info,adventure_conn_tosql):
try:
dw_orders_info.to_sql("dw_orders_info_{}".format(MY_NAME),con=adventure_conn_tosql, if_exists = 'replace', index=False)
except Exception as e:
logger.info("save_tosql报错信息{}".format(e))
4、数据自动更新
为了保证数据及时更新,通过Python的schedule模块、os模块以及Linux后台功能实现自动更新。
(1) 创建脚本
通过schedule模块定时执行:
在每天早上的7点,7点20分以及7点40分分别对三个表进行更新。
# 设置定时执行代码
if __name__ == '__main__':
schedule.every().day.at('07:00').do(job1)
schedule.every().day.at('07:20').do(job2)
schedule.every().day.at('07:40').do(job3)
通过os模块与命令窗口交互:
os.system(
"/home/anaconda3/bin/python3 /home/frog005/adventure_chen/dw_order_by_day.py >> /home/frog005/adventure_chen/chen_logs/dw_order_by_day_schedule.log 2>&1 &")
(2) 在Linux上部署脚本
在linux服务器上,加入nohup即可使退出终端,脚本依然执行,&可令脚本自动挂在后台执行。
nohup python schedule_job_chen.py > schedule_chen.log 2>&1 &
5、搭建可视化看板
在完成上述步骤后,使用Power BI与MySQL数据库链接,制作可视化看板。共三类:详细销售数据页面、销售趋势页面、总体销售情况页面。
可视化报表地址:请点击
(1) 详细销售数据页面
共分为今日、昨日、本月、本季度、本年度五个部分。
(2) 销售趋势页面
(3) 总体销售情况页面