使用Python编写MapRecuce代码的技巧就在于我们使用了Hadoop streaming来帮助我们在map和reduce之间传递数据通过stdin和stdout,我们仅仅使用Python的sys.stdin来输入数据,使用Python的sys.stdout来输出数据,其他的streaming都会帮我们做好。别不信这一点!
首先创建map和reduce处理程序,用python实现。
mapper.py:
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
recucer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
保存,比如我保存在本地的home/hadoop文件夹下面,并且要给这两个文件可执行权限
不用急着在hadoop上验证,可以先在本地验证map和reduce代码的正确性:
# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar 1
foo 3
labs 1
quux 2
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)
如果本地验证通过,说明map和reduce的处理逻辑没有问题。然后可以下载三个文本文档作为输入源:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
The Notebooks of Leonardo Da Vinci
Ulysses by James Joyce
然后把这三个文本上传到hdfs文件系统上:
bin/hdfs dfs -copyFromLocal /home/hadoop/gutenberg /user/hadoop/gutenberg
可以用bin/hdfs dfs -ls /user/hadoop/gutenberg来检测是否上传成功:
[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -ls /user/hadoop/gutenberg
Found 3 items
-rw-r--r-- 1 hadoop supergroup 21293 2017-09-04 11:41 /user/hadoop/gutenberg/20417
-rw-r--r-- 1 hadoop supergroup 23403 2017-09-04 11:41 /user/hadoop/gutenberg/4300
-rw-r--r-- 1 hadoop supergroup 22178 2017-09-04 11:41 /user/hadoop/gutenberg/5000
[hadoop@localhost hadoop-2.7.4]$
然后要找到自己本地环境中安装hadoop中的stream jar包,我本地的stream包是
/home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar
然后用此streaming的jar文件来运行就可以:
[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar -file /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output
或者:
[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output
可以查看运行结果:
[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output3
packageJobJar: [/tmp/hadoop-unjar2137675126277149757/] [] /tmp/streamjob3391053950303419829.jar tmpDir=null
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:19 INFO mapred.FileInputFormat: Total input paths to process : 3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: number of splits:3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1504425057280_0005
17/09/04 14:33:20 INFO impl.YarnClientImpl: Submitted application application_1504425057280_0005
17/09/04 14:33:20 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1504425057280_0005/
17/09/04 14:33:20 INFO mapreduce.Job: Running job: job_1504425057280_0005
17/09/04 14:33:31 INFO mapreduce.Job: Job job_1504425057280_0005 running in uber mode : false
17/09/04 14:33:31 INFO mapreduce.Job: map 0% reduce 0%
17/09/04 14:33:51 INFO mapreduce.Job: map 100% reduce 0%
17/09/04 14:34:00 INFO mapreduce.Job: map 100% reduce 100%
17/09/04 14:34:00 INFO mapreduce.Job: Job job_1504425057280_0005 completed successfully
17/09/04 14:34:00 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=86853
FILE: Number of bytes written=748071
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=67169
HDFS: Number of bytes written=24946
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=52498
Total time spent by all reduces in occupied slots (ms)=5713
Total time spent by all map tasks (ms)=52498
Total time spent by all reduce tasks (ms)=5713
Total vcore-milliseconds taken by all map tasks=52498
Total vcore-milliseconds taken by all reduce tasks=5713
Total megabyte-milliseconds taken by all map tasks=53757952
Total megabyte-milliseconds taken by all reduce tasks=5850112
Map-Reduce Framework
Map input records=1665
Map output records=5029
Map output bytes=76784
Map output materialized bytes=86865
Input split bytes=295
Combine input records=0
Combine output records=0
Reduce input groups=998
Reduce shuffle bytes=86865
Reduce input records=5029
Reduce output records=998
Spilled Records=10058
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=847
CPU time spent (ms)=3130
Physical memory (bytes) snapshot=712916992
Virtual memory (bytes) snapshot=8411308032
Total committed heap usage (bytes)=444870656
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=66874
File Output Format Counters
Bytes Written=24946
17/09/04 14:34:00 INFO streaming.StreamJob: Output directory: /user/test/gutenberg-output
可以发现运行成功,然后可以查看运行结果:
[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -cat /user/test/gutenberg-output/part-00000
! 3
""; 6
"-//W3C//DTD 3
"//m.gutenberg.org/ebooks/20417.mobile"; 1
"//m.gutenberg.org/ebooks/4300.mobile"; 1
"//m.gutenberg.org/ebooks/5000.mobile"; 1
"/ebooks/suggest/"; 3
"Load 3
"Quixote"</td> 3
"en_US"; 6
"http://www.gutenberg.org/ebooks/20417"; 1
"http://www.gutenberg.org/ebooks/4300"; 1
"http://www.gutenberg.org/ebooks/5000"; 1
"http://www.w3.org/MarkUp/DTD/xhtml-rdfa-1.dtd"> 3
"screen"; 3
<Enter>. 3
<enter>"> 3
<h>" 3
<s>" 3
...
可以看到结果了。
感谢:数据来源:可以是web日志,比如access.log,可以是爬虫爬来的数据,就要用python来做了。从数据来源中看能不能获取一些有用信息。
参考:
最经典的python在hadoop上使用教程
使用 python 构建基于 hadoop 的 mapreduce 日志分析平台