Goal:
实现两组数据的合并
input data 1:
place_id \t woeid \t latitude \t longitude \t place_name \t place_type_id \t place_url
Input data 2:
photo_id \t owner \t tags \t date_taken \t place_id \t accuracy
For data join, usually there are two different ways:
- map side join
-
reduce side join
map side join通常要求从 dataset->Map 的数据有序,否则,其时间复杂度将是n的x次方,x为dataset的个数。
因为我们的Input data 2 含有很多不同的dataset, 可能会消耗很长的时间,所以选择使用reduce side join。
Expect mapper output
output data 1:
place_id#0 \t place_type_id \t place_url
output data 2:
place_id#1 \t photo_id \t tags
我们将place_id 作为key,对来源不同的数据组在‘#’后加上数字予以辨别,然后再选取需要的数据从mapper当中进行输出。
Shuffle
我们在shuffle当中进行一个partion:根据place_id作为key当成第一主键,‘#’后的数字标记label为第二主键进行排序和分区。
排序和分区后的数据进入reducer
可以通过
-D mapreduce.partition.keypartitioner.options=-k1,1 \
进行实现
Reducer
此时进入reducer的data应该是排序分区好了的sorted data
Input data 1:
place_id#0 \t place_type_id \t place_url
Input data 2:
place_id#1 \t photo_id \t tags
此时我们期望的output应该是通过place_id 作为key将两组数据连接在一起
Output data:
photo_id \t tags \t place_type_id \t place_url
Code
具体的代码实现如下
Mapper:
#!/usr/bin/python3
import sys
def multi_mapper():
""" This mapper will output different format dependind on input type
If input is place file:
Input format: place_id \t woeid \t latitude \t longitude \t place_name \t place_type_id \t place_url
Output format: place_id#0 \t place_type_id \t place_url
If input is photo file:
Input format: photo_id \t owner \t tags \t date_taken \t place_id \t accuracy
Output format: place_id#1 \t photo_id \t tags
"""
for line in sys.stdin:
parts = line.strip().split("\t")
if len(parts) == 7:
place_id, place_type_id, place_url = parts[0].strip(), parts[5].strip(), parts[6].strip()
if place_type_id == '7' or place_type_id == '22':
print(place_id + "#0\t" + place_type_id + "\t" + place_url)
elif len(parts) == 6:
photo_id, place_id, tags = parts[0].strip(), parts[4].strip(), parts[2].strip()
print(place_id + "#1\t" + photo_id + "\t" + tags)
if __name__ == "__main__":
multi_mapper()
Reducer:
#!/usr/bin/python3
import sys
def read_map_output(file):
""" Return an key-value pair extracted from file (sys.stdin).
Input format: key \t value
Output format: (key, value)
"""
for line in file:
yield line.strip().split('\t', 1)
def combine_place():
""" This reducer run reduce side join
Input format: place_id#0 \t place_type_id \t place_url
place_id#1 \t photo_id \t tags
Ourput format: photo_id \t tags \t place_type_id \t place_url
"""
data = read_map_output(sys.stdin)
current_place_id = ''
current_place_url_and_type_id= 'NULL'
for key, value in data:
# check input is valid
if key == '':
continue
# split key by '#' , get the place_id and a number
key = key.split('#')
# check the key-value pair come from place or come from photo
if key[0] != current_place_id:
if key[1] == '0':
current_place_id = key[0]
current_place_url_and_type_id = value
else:
current_place_id = key[0]
current_place_url_and_type_id = 'NULL'
print(value + '\t' + current_place_url_and_type_id)
else:
print(value + '\t' + current_place_url_and_type_id)
if __name__ == '__main__':
combine_place()