分布式系统-实验-MapReduce

介绍

通过 分布式系统系列文章,我们了解了分布式的一些基本概念,若是写点代码实践一下,那就更好了。先做个简单的实验练练手,还记得 MapReduce 吗?,本次实验中会构建一个 MapReduce 库,主要能熟悉 Go 语言外加了解分布式系统中的容错机制。首先写个一个简单的 MapReduce 程序,再写一个 Master,它不仅能分配任务给 worker 而且能处理 worker 执行错误。接口参考论文描述。

实验环境

不会让你从零开始撸代码啦,还不快 git clone ?

$ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
$ cd 6.824
$ ls
Makefile src

MapReduce 代码支持顺序执行和分布式执行。顺序执行意味着 Map 先执行,当所有 Map 任务都完成了再执行 Reduce,这种模式可能效率比较低,但是比较便于调试,毕竟串行。分布式执行启动了很多 worker 线程,他们并行执行 Map 任务,然后执行 Reduce 任务,这种模式效率更高,当然更难实现和调试。

准备:熟悉代码

mapreduce 包提供了一个简单的 MapReduce 顺序执行实现。应用只要调用 Distributed() 方法就可以启动一个任务,但是要调试的时候可能需要调用 Sequential().

mapreduce 的运行流程如下:

  1. 应用层需要提供输入文件,一个 map 函数,一个 reduce 函数,要启动 reduce 任务的数量。

  2. 用这些参数创建一个 master。它会启动一个 RPC 服务器(master_rpc.go),然后等待 worker 注册(Register())。当有待完成的任务时,schedule() 就会将任务分配给 worker,同时也会进行 worker 的错误处理。

  3. master 认为每个输入文件应当交给一个 map 任务处理,然后调用 doMap(),无论直接调用 Sequential() 还是通过 RPC 给 worker 发送 DoTask 消息都会触发这个操作。每当调用 doMap() 时,它都会去读取相应的文件,以文件内容调用 map 函数并且为每个输入文件产生 nReduce 个文件。因此,每个 map 任务最终会产生 #files x nReduce 个文件。

  4. master 接下来会对每个 reduce 任务至少调用一次 doReduce()doReduce() 首先会收集 nReduce 个 map 任务产生的文件,然后在每个文件上执行 reduce 函数,最后产生一个结果文件。

  5. master 会调用 mr.merge() 方法将上一步产生所有结果文件聚合到一个文件中。

所以本次实验就是到填空题,空是:doMap, doReduce,schedule 和 reduce。

其他的方法基本不需要改动,有时间的研究研究有助于理解整体架构。

Part I: Map/Reduce 输入和输出

第一个空 doMap() 函数的功能是读取指定文件的内容,执行 mapF 函数,将结果保存在新的文件中;而 doReuce() 读取 doMap 的输出文件,执行 reduceF 函数,将结果存在磁盘中。

写完了就测试测试,测试文件(test_test.go)已经写好了。串行模式测试可执行:

$ cd 6.824
$ export "GOPATH=$PWD"  
$ cd "$GOPATH/src/mapreduce"
$ setup ggo_v1.5
$ go test -run Sequential mapreduce/...
ok      mapreduce   2.694s

如果你看到的不是 ok,说明还有 bug 哦。在 common.go 将 debugEnbale 设置成 true,然后运行 go test -run Sequential mapreduce/... -v,可以看到更详细的输出:

$ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
=== RUN   TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN   TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok      mapreduce   2.672s

Part II: 单机词频统计

完成了第一部分,我们可以开始构建自己第一个 MapReduce 系统:词频统计器。没错还是填空题:mapF 和 reduceF,让 wc.go 可以统计出每个单词出现的次数。我们的测试文件里面只有英文,所以一个单词就是连续出现字母,判断一个字母参考标准库 unicode.IsLetter

测试文件是 6.824/src/main/pg-*.txt,不妨先编译试试:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

当然通过不了,毕竟空还没填呢。mapF 的参数是测试文件名和其内容,分割成单词,返回 []mapreduce.KeyValue,KeyValue:单词-频次。轮到 reduceF 函数了,它会针对每个 key(单词) 调用一次,参数是某个单词以及这个单词在所有测试文件中的 mapF 结果。

写好了,便可测试:

$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed

最终的结果保存在 mrtmp.wcseq 文件中。运行 $ rm mrtmp.* 删除所有的中间数据文件。

运行 sort -n -k2 mrtmp.wcseq | tail -10,如果看到的和下面的一样,说明你写对了。

$ 
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024

亦可直接运行 $sh ./test-wc.sh

小提示: strings.FieldFunc 可以将一个 string 分割成多个部分,strconv 包中有函数可将 string 转换成 int。

Part III: 分布式 MapReduce

MapReduce 让开发者最爽的地方是不需要关心代码是在多台机器并行执行的。但我们现在的实现是 master 把 map 和 reduce 任务一个一个执行。虽然这种实现模式概念上很简单,但是性能并不是很高。接下来我们来实现一个并发的 MapReduce,它会调用多个 worker 线程去执行任务,这样可以更好地利用多核CPU。当然我们的实验不是真署在多台机器上而是用 channel 去模拟分布式计算。

由于是并发,所以需要调度者 master 线程,它负责给 worker 分发任务,而且一直等待直到所有 worker 完成任务。为了让我们的实验更加真实,master 只能通过 RPC 的方式与 worker 通讯。worker 代码(mapreduce/worker.go)已经准备好了,它用于启动 worker。

下一个空是 schedule.go 中的 schedule(),这个方法负责给 worker 分发 map 和 reduce 任务,当所有任务完成后返回。

master.go 中的 run() 方法会先调用 schedule(),然后调用 merge() 把每个 reduce 任务的输出文件整合到一个文件里面。schedule 只需要告诉 worker 输入文件的名字 (mr.files[task]) 和任务 task,worker 自己知道从哪里读取也知道把结果写到哪个文件里面。master 通过 RPC 调用 Worker.DoTask 通知 worker 开始新任务,同时还会在 RPC 参数中包含一个 DoTaskArgs 对象。

当一个 worker 准备完毕可以工作时,它会向 master 发送一个 Register RPC,注册的同时还会把这个 worker 的相关信息放入 mr.registerChannel。所以 schedule 应该通过读取这个 channel 处理新 worker 的注册。

当前正在运行的 job 信息都在 Master 中定义。注意,master 不需要知道 Map 或 Reduce 具体执行的是什么代码;当一个 worker 被 wc.go 创建时就已经携带了 Map 和 Reduce 函数的信息。

运行 $ go test -run TestBasic mapreduce/... 可进行基础测试。

小提示: master 应该并行的发送 RPC 给 worker,这样 worker 可以并发执行任务。可参考 Go RPC 文档。

小提示: master 应该等一个 worker 完成当前任务后马上为它分配一个新任务。等待 master 响应的线程可以用 channel 作为同步工具。Concurrency in Go 有详细的 channel 用法。

小提示: 跟踪 bug 最简单的方法就是在代码加入 debug(),然后执行 go test -run TestBasic mapreduce/... > out,out 就会包含调试信息。最重要的思考你原以为的输出和真正的输出为何不一样。

注:当前的代码试运行在一个 Unix 进程中,而且它能够利用一台机器的多核。如果是要部署在多台机器上,则要修改代码让 worker 通过 TCP 而不是 Unix-domain sockets 通讯。此外还需要一个网络文件系统共享存储。

Part IV: 处理 worker 执行错误

本小节要让你的 master 能够处理任务执行失败的 worker。由于 MapReduce 中 worker 并没有持久状态,所以处理起来相对容易。如果一个 worker 执行失败了,master 向 worker 发送的任何一个 RPC 都可能失败,例如超时。因此,如果失败,master 应该把这个任务指派给另为一个worker。

一个 RPC 失败并不一定代表 worker 失败,有可能是某个 worker 正常运行但 master 无法获取到它的信息。所以可能会出两个 worker 同时执行同一个任务。不过因为每个任务都是幂等的,一个任务被执行两次是没啥影响。

我们假设它不会失败,所以不需要处理 master 失败的情况。让 master 可以容错是相对困难的,因为它保持着持久的状态,当它失败后我们需要恢复它的状态以保证它可以继续工作。

test_test.go 还剩最后两个测试。测有一个 worker 失败的情况和有很多 worker 失败的情况。运行可测试:$ go test -run Failure mapreduce/...

Part V: 反向索引(可选)

挑战性:

词频统计虽然是 MapReduce 最经典的一个应用,但是在大规模数据应用不经常用。试试写个反向索引应用。

反向索引在计算机科学中使用广泛,尤其在文档搜索领域中非常有用。一般来说,一个反向索引就是一个从数据到数据特征的映射。例如,在文档搜索中,这个映射可能就是关键词与文档名称的映射。

main/ii.go 的整体结构跟 wc.go 相似。修改 mapF 和 reduceF 让它们创建反向索引。运行 ii.go 应该输出一个元组列表,每一行的格式如下:

$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt

你的代码应该通过 test-ii.sh 的测试:

$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt

通过全部测试

运行 src/main/test-mr.sh 可测试本次实验的所有内容。如果全部通过,可以看到:

$ sh ./test-mr.sh
==> Part I
ok      mapreduce   3.053s

==> Part II
Passed test

==> Part III
ok      mapreduce   1.851s

==> Part IV
ok      mapreduce   10.650s

==> Part V (challenge)
Passed test
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容