设计按类别分类的 Amazon 销售排名 原文链接
1.描述使用场景和约束
使用场景:
- 按照分类计算上周/上月热销榜单
- 用户主要查看近几周的热销榜单
假设和约束:
- 流量不均衡
- 同一商品可能同时属于不同种类
- 种类只有一个层级,没有上下关系
- 榜单需要每小时更新
- 1亿件商品
- 1000个种类
- 每月10亿笔交易记录
- 每月1000亿次读请求
容量估算:
单条交易记录:
-
created_at
5字节 -
product_id
8字节 -
category_id
4字节 -
seller_id
8字节 -
buyer_id
8字节 -
quantity
4字节 -
total_price
5字节
共计40字节
每月40GB的交易记录数据,每秒400次交易,每秒4000次读请求
2.创建系统设计图
3.设计关键组件
使用场景:计算上周不同种类的热销榜单
假设交易原始数据结构如下:
timestamp product_id category_id qty total_price seller_id buyer_id
t1 product1 category1 2 20.00 1 1
t2 product1 category2 2 20.00 2 2
t2 product1 category2 1 10.00 2 3
t3 product2 category1 3 7.00 3 4
t4 product3 category2 7 2.00 4 5
t5 product4 category1 1 5.00 5 6
...
排行系统可以抽取交易原始数据,使用MapReduce模型计算之后输出结果到sales_rank
数据库。
其中MapReduce步骤如下:
- 将数据按
(category, product_id), sum(quantity)
的格式取出来 - 进行分布式排序:
class SalesRanker(MRJob):
def within_past_week(self, timestamp):
"""Return True if timestamp is within past week, False otherwise."""
...
def mapper(self, _ line):
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
(category1, product1), 2
(category2, product1), 2
(category2, product1), 1
(category1, product2), 3
(category2, product3), 7
(category1, product4), 1
"""
timestamp, product_id, category_id, quantity, total_price, seller_id, \
buyer_id = line.split('\t')
if self.within_past_week(timestamp):
yield (category_id, product_id), quantity
def reducer(self, key, value):
"""Sum values for each key.
(category1, product1), 2
(category2, product1), 3
(category1, product2), 3
(category2, product3), 7
(category1, product4), 1
"""
yield key, sum(values)
def mapper_sort(self, key, value):
"""Construct key to ensure proper sorting.
Transform key and value to the form:
(category1, 2), product1
(category2, 3), product1
(category1, 3), product2
(category2, 7), product3
(category1, 1), product4
The shuffle/sort step of MapReduce will then do a
distributed sort on the keys, resulting in:
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
"""
category_id, product_id = key
quantity = value
yield (category_id, quantity), product_id
def reducer_identity(self, key, value):
yield key, value
def steps(self):
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
reducer=self.reducer),
self.mr(mapper=self.mapper_sort,
reducer=self.reducer_identity),
]
输出结果如下:
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
排序结果表sales_rank
结构如下:
id int NOT NULL AUTO_INCREMENT
category_id int NOT NULL
total_sold int NOT NULL
product_id int NOT NULL
PRIMARY KEY(id)
FOREIGN KEY(category_id) REFERENCES Categories(id)
FOREIGN KEY(product_id) REFERENCES Products(id)
可以在id
,category_id
和product_id
上创建联合索引来提高查询效率。
使用场景:用户浏览过去几周的热销排行榜
主要是针对sales_rank
表的查询,针对热点分类的数据,可以采用cache来提高读请求效率。