6 824 Lab 1 Mapreduce
开始学习大名鼎鼎的MIT 6.824: Distributed Systems课程,我跟的是2016年的课程,课程的主要内容是读Paper和做Lab,使用的语言为Go。五一假期期间我基本做完了Lab 1,感觉难度还是相当大的。本篇文章是我对Lab 1的一个总结。
MapReduce
每次读MapReduce论文,都会有新的收获,也自知还有理解不到位的地方。
Execution Overview
- 输入数据被划分为
M个分片,由map worker产生的中间key-value pairs被划分为R个分片。其中M的大小取决于GFS块的大小,R取决于reduce worker的个数。所以,整个MapReduce Job包括M个map task和R个reduce task。 - 在
map阶段,map worker把输出的中间key-value pairs分割成R个region,注意pairs首先会存储在缓冲区中,然后定期的写入本地磁盘。pairs在map worker上的位置信息会发送给master,由master通知reduce worker数据读取位置信息。 - 在
reduce阶段,当reduce worker获取了输入数据的位置信息后,通过RPC读取数据。当reduce worker获取了所有的相关数据之后,会对它们进行一次排序,排序的目的在于不同key的pairs会汇聚到同一个reduce worker。 - 当所有的
task完成后,一共会产生R个输出文件,每个reduce worker对应一个。一般来说没有必要将这R个文件合并成一个文件,因为这R个文件往往会作为下一个MapReduce Job的输入数据。
Master Data Structures
master会记录所有map task和reduce task的状态(idle,in-progress和completed)信息。同时,master还会保存那些处于non-idle状态的task所关联的worker的信息。- 对于每个状态为
completed的map task,master会保存该map task对应的R个region的位置信息和大小信息。
Fault Tolerance
Worker Failure
master通过周期性的ping所有的worker来确认worker是否可用,如果某个worker崩溃了,那么在该worker上完成的所有map task都会回退到idle状态,因此这些task会被重新调度到可用的worker上。同理,如果处于in-progress的map task或reduce task所在的worker崩溃了,那么这些task也会被重新调度到可用的worker上重新执行。- 如果某个
worker崩溃了,其上处于completed状态的map task需要重新被调度执行,这是因为map task的输出数据是存储于local disk。相反,如果某个worker崩溃了,其上处于completed状态的reduce task不需要重新被调度执行,因为reduce task的输出数据是存储在global file system上的。 - 当
reduce worker在执行reduce task时,如果某个map task对应的worker由worker A切换到了worker B(可能是因为worker A崩溃了),那么master会通知该reduce worker从worker B读取数据。
Master Failure
master通过定期的做checkpoints来保证master的容错性。- 由于
master只有一个,所以可以认为master出故障的概率很小。如果真的出故障了,那么根据需要重新执行MapReduce Job。
Lab
Lab 1包括4个Part,Part 1 & Part 2实现Sequential MapReduce,Part 3 & Part 4实现Distributed MapReduce,并且要解决worker failure。同时,Lab 1提供了MapReduce的整个框架,并实现了与核心内容无关的代码。在完成各个Part之前,我们需要理解Lab 1实现的两个版本的MapReduce框架的设计思路。
master和worker以goroutine的形式存在,当worker可用时,以RPC的方式向master注册,master通过调用schedule()来实现task的调度。同时,在Distributed MapReduce中schedule()还需要处理worker failure。schedule()将会以参数的形式存在于Sequential/Distributed MapReduce。- 论文中
map task以块作为分割,而Lab中的map task将以文件作为分割。 - 在
map阶段,对于每个map task(一个输入文件),master会至少调用一次doMap()。同理在reduce阶段,master对每个reduce task至少调用一次doReduce()。在Sequential MapReduce中,Sequential()[master.go]的核心参数schedule()遍历了file slice,根据当前所处的phase对每个task调用doMap()或doReduce()。在Distributed MapReduce中,Distributed()[master.go]的核心参数schedule()需要我们去实现(Part 3 & Part 4)。 - 在完成了所有的
map task和reduce task之后,master会调用merge()对reduce worker的输出文件进行合并。 - 最后
master向每个worker发送Shutdown RPC,然后关闭整个应用。
理解完整个Lab的框架,下面谈谈每个Part需要注意的地方。
Part 1: Map/Reduce input and output
Part 1主要的内容是完成doMap()和doReduce(),在编码前一定要仔细阅读注释给我们提供的信息。
doMap()读取输入文件(inFile)的内容,通过mapF将其转化为[]KeyValue。对于每个inFile,doMap将产生nReduce(对应于论文中的R)个输出文件。同时,数据以JSON的格式存储至输出文件中。doReduce()从nMap(对应于论文中的M)个map worker读取输入数据([]KeyValue),注意输入数据的格式为JSON。根据key构建出map[key]values数据结构,对每个key(string)和对应的values([]string)调用reduceF()。doReduce()产生一个输出文件。
Part 2: Single-worker word count
Part 2实现经典的WordCount,需要我们实现mapF和reduceF中的逻辑。需要注意的是要按照Lab提供的切词规范(determined by unicode.IsLetter)去分词,而不要想当然的用空格作为分隔符。
Part 3: Distributing MapReduce tasks
Part 3着手实现Distributed MapReduce,为了模拟真正的分布式环境,master和worker之间的通信和同步仅通过RPC和channel来实现。Part 3需要我们实现schedule()的调度逻辑。
Distributed MapReduce的实现逻辑在上文已有所提及,Sequential MapReduce顺序调度每个task执行,而在Distributed MapReduce中,schedule()将task调度到当前可用的worker上去执行。- 只有当
map/reduce阶段所有的task全部完成后,schedule()才能返回。这一点在代码中我是通过sync.WaitGroup来实现。 - 在分布式环境下,
schedule()通过RPC调度worker执行task,同时输入文件的信息也是作为RPC参数传递。 - 当前可用的
worker存储于master的registerChannel中。如果当前没有可用的worker,那么schedule()会进入阻塞状态,直至出现新的可用的worker。只有当存在可用的worker时才允许创建goroutine执行task(这一点体现在schedule.go的line 38和line 42)。
Part 4: Handling worker failures
Part 4需要解决worker failure,映射到具体的代码实现上,work failure是指在进行RPC时,RPC服务器返回了call failed信息。根据论文,我们只需要将task重新分配给另一个可用的worker,并将上次RPC失败的输入文件信息作为本次RPC的参数传递。
需要注意的是,RPC失败并不一定意味着worker崩溃了,也有可能是因为网络原因导致worker不可达,所以有可能导致多个worker在执行着相同的task。不过由于task的幂等性,这并没有什么问题,master记录着每个task对应的真正有效的worker。
Implementation
Lab 1的代码实现在这里。