让python代码在hadoop上运行

使用Python编写MapRecuce代码的技巧就在于我们使用了Hadoop streaming来帮助我们在map和reduce之间传递数据通过stdin和stdout,我们仅仅使用Python的sys.stdin来输入数据,使用Python的sys.stdout来输出数据,其他的streaming都会帮我们做好。别不信这一点!

首先创建map和reduce处理程序,用python实现。
mapper.py:

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

recucer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

保存,比如我保存在本地的home/hadoop文件夹下面,并且要给这两个文件可执行权限
不用急着在hadoop上验证,可以先在本地验证map和reduce代码的正确性:

# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1

hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...]
 (you get the idea)

如果本地验证通过,说明map和reduce的处理逻辑没有问题。然后可以下载三个文本文档作为输入源:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
The Notebooks of Leonardo Da Vinci
Ulysses by James Joyce
然后把这三个文本上传到hdfs文件系统上:
bin/hdfs dfs -copyFromLocal /home/hadoop/gutenberg /user/hadoop/gutenberg
可以用bin/hdfs dfs -ls /user/hadoop/gutenberg来检测是否上传成功:

[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -ls /user/hadoop/gutenberg
Found 3 items
-rw-r--r--   1 hadoop supergroup      21293 2017-09-04 11:41 /user/hadoop/gutenberg/20417
-rw-r--r--   1 hadoop supergroup      23403 2017-09-04 11:41 /user/hadoop/gutenberg/4300
-rw-r--r--   1 hadoop supergroup      22178 2017-09-04 11:41 /user/hadoop/gutenberg/5000
[hadoop@localhost hadoop-2.7.4]$ 

然后要找到自己本地环境中安装hadoop中的stream jar包,我本地的stream包是

/home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar

然后用此streaming的jar文件来运行就可以:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar -file /home/hadoop/mapper.py    -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py   -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output

或者:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar   -mapper /home/hadoop/mapper.py  -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output

可以查看运行结果:

[hadoop@localhost hadoop-2.7.4]$ bin/hadoop jar /home/hadoop/hadoop-2.7.4/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar  -mapper /home/hadoop/mapper.py  -reducer /home/hadoop/reducer.py -input /user/test/gutenberg/* -output /user/test/gutenberg-output3
packageJobJar: [/tmp/hadoop-unjar2137675126277149757/] [] /tmp/streamjob3391053950303419829.jar tmpDir=null
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:18 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/09/04 14:33:19 INFO mapred.FileInputFormat: Total input paths to process : 3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: number of splits:3
17/09/04 14:33:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1504425057280_0005
17/09/04 14:33:20 INFO impl.YarnClientImpl: Submitted application application_1504425057280_0005
17/09/04 14:33:20 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1504425057280_0005/
17/09/04 14:33:20 INFO mapreduce.Job: Running job: job_1504425057280_0005
17/09/04 14:33:31 INFO mapreduce.Job: Job job_1504425057280_0005 running in uber mode : false
17/09/04 14:33:31 INFO mapreduce.Job:  map 0% reduce 0%
17/09/04 14:33:51 INFO mapreduce.Job:  map 100% reduce 0%
17/09/04 14:34:00 INFO mapreduce.Job:  map 100% reduce 100%
17/09/04 14:34:00 INFO mapreduce.Job: Job job_1504425057280_0005 completed successfully
17/09/04 14:34:00 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=86853
        FILE: Number of bytes written=748071
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=67169
        HDFS: Number of bytes written=24946
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=52498
        Total time spent by all reduces in occupied slots (ms)=5713
        Total time spent by all map tasks (ms)=52498
        Total time spent by all reduce tasks (ms)=5713
        Total vcore-milliseconds taken by all map tasks=52498
        Total vcore-milliseconds taken by all reduce tasks=5713
        Total megabyte-milliseconds taken by all map tasks=53757952
        Total megabyte-milliseconds taken by all reduce tasks=5850112
    Map-Reduce Framework
        Map input records=1665
        Map output records=5029
        Map output bytes=76784
        Map output materialized bytes=86865
        Input split bytes=295
        Combine input records=0
        Combine output records=0
        Reduce input groups=998
        Reduce shuffle bytes=86865
        Reduce input records=5029
        Reduce output records=998
        Spilled Records=10058
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=847
        CPU time spent (ms)=3130
        Physical memory (bytes) snapshot=712916992
        Virtual memory (bytes) snapshot=8411308032
        Total committed heap usage (bytes)=444870656
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=66874
    File Output Format Counters 
        Bytes Written=24946
17/09/04 14:34:00 INFO streaming.StreamJob: Output directory: /user/test/gutenberg-output

可以发现运行成功,然后可以查看运行结果:

[hadoop@localhost hadoop-2.7.4]$ bin/hdfs dfs -cat  /user/test/gutenberg-output/part-00000
!   3
""; 6
"-//W3C//DTD    3
"//m.gutenberg.org/ebooks/20417.mobile";    1
"//m.gutenberg.org/ebooks/4300.mobile"; 1
"//m.gutenberg.org/ebooks/5000.mobile"; 1
"/ebooks/suggest/"; 3
"Load   3
"Quixote"</td>  3
"en_US";    6
"http://www.gutenberg.org/ebooks/20417";    1
"http://www.gutenberg.org/ebooks/4300"; 1
"http://www.gutenberg.org/ebooks/5000"; 1
"http://www.w3.org/MarkUp/DTD/xhtml-rdfa-1.dtd">    3
"screen";   3
<Enter>.    3
<enter>">   3
<h>"    3
<s>"    3
...

可以看到结果了。
感谢:数据来源:可以是web日志,比如access.log,可以是爬虫爬来的数据,就要用python来做了。从数据来源中看能不能获取一些有用信息。
参考:
最经典的python在hadoop上使用教程
使用 python 构建基于 hadoop 的 mapreduce 日志分析平台

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容