开始学习大名鼎鼎的MIT 6.824: Distributed Systems课程,我跟的是2016年的课程,课程的主要内容是读Paper和做Lab,使用的语言为Go。五一假期期间我基本做完了Lab 1,感觉难度还是相当大的。本篇文章是我对Lab 1的一个总结。

MapReduce

每次读MapReduce论文,都会有新的收获,也自知还有理解不到位的地方。

Execution Overview

  • 输入数据被划分为M个分片,由map worker产生的中间key-value pairs被划分为R个分片。其中M的大小取决于GFS块的大小,R取决于reduce worker的个数。所以,整个MapReduce Job包括Mmap taskRreduce task
  • map阶段,map worker把输出的中间key-value pairs分割成Rregion,注意pairs首先会存储在缓冲区中,然后定期的写入本地磁盘。pairsmap worker上的位置信息会发送给master,由master通知reduce worker数据读取位置信息。
  • reduce阶段,当reduce worker获取了输入数据的位置信息后,通过RPC读取数据。当reduce worker获取了所有的相关数据之后,会对它们进行一次排序,排序的目的在于不同keypairs会汇聚到同一个reduce worker
  • 当所有的task完成后,一共会产生R个输出文件,每个reduce worker对应一个。一般来说没有必要将这R个文件合并成一个文件,因为这R个文件往往会作为下一个MapReduce Job的输入数据。

Master Data Structures

  • master会记录所有map taskreduce task的状态(idlein-progresscompleted)信息。同时,master还会保存那些处于non-idle状态的task所关联的worker的信息。
  • 对于每个状态为completedmap taskmaster会保存该map task对应的Rregion的位置信息和大小信息。

Fault Tolerance

Worker Failure

  • master通过周期性的ping所有的worker来确认worker是否可用,如果某个worker崩溃了,那么在该worker上完成的所有map task都会回退到idle状态,因此这些task会被重新调度到可用的worker上。同理,如果处于in-progressmap taskreduce task所在的worker崩溃了,那么这些task也会被重新调度到可用的worker上重新执行。
  • 如果某个worker崩溃了,其上处于completed状态的map task需要重新被调度执行,这是因为map task的输出数据是存储于local disk。相反,如果某个worker崩溃了,其上处于completed状态的reduce task不需要重新被调度执行,因为reduce task的输出数据是存储在global file system上的。
  • reduce worker在执行reduce task时,如果某个map task对应的workerworker A切换到了worker B(可能是因为worker A崩溃了),那么master会通知该reduce workerworker B读取数据。

Master Failure

  • master通过定期的做checkpoints来保证master的容错性。
  • 由于master只有一个,所以可以认为master出故障的概率很小。如果真的出故障了,那么根据需要重新执行MapReduce Job

Lab

Lab 1包括4PartPart 1 & Part 2实现Sequential MapReducePart 3 & Part 4实现Distributed MapReduce,并且要解决worker failure。同时,Lab 1提供了MapReduce的整个框架,并实现了与核心内容无关的代码。在完成各个Part之前,我们需要理解Lab 1实现的两个版本的MapReduce框架的设计思路。

  • masterworkergoroutine的形式存在,当worker可用时,以RPC的方式向master注册,master通过调用schedule()来实现task的调度。同时,在Distributed MapReduceschedule()还需要处理worker failureschedule()将会以参数的形式存在于Sequential/Distributed MapReduce
  • 论文中map task以块作为分割,而Lab中的map task将以文件作为分割。
  • map阶段,对于每个map task(一个输入文件),master会至少调用一次doMap()。同理在reduce阶段,master对每个reduce task至少调用一次doReduce()。在Sequential MapReduce中,Sequential()[master.go]的核心参数schedule()遍历了file slice,根据当前所处的phase对每个task调用doMap()doReduce()。在Distributed MapReduce中,Distributed()[master.go]的核心参数schedule()需要我们去实现(Part 3 & Part 4)。
  • 在完成了所有的map taskreduce task之后,master会调用merge()reduce worker的输出文件进行合并。
  • 最后master向每个worker发送Shutdown RPC,然后关闭整个应用。

理解完整个Lab的框架,下面谈谈每个Part需要注意的地方。

Part 1: Map/Reduce input and output

Part 1主要的内容是完成doMap()doReduce(),在编码前一定要仔细阅读注释给我们提供的信息。

  • doMap()读取输入文件(inFile)的内容,通过mapF将其转化为[]KeyValue。对于每个inFiledoMap将产生nReduce(对应于论文中的R)个输出文件。同时,数据以JSON的格式存储至输出文件中。
  • doReduce()nMap(对应于论文中的M)个map worker读取输入数据([]KeyValue),注意输入数据的格式为JSON。根据key构建出map[key]values数据结构,对每个key(string)和对应的values([]string)调用reduceF()doReduce()产生一个输出文件。

Part 2: Single-worker word count

Part 2实现经典的WordCount,需要我们实现mapFreduceF中的逻辑。需要注意的是要按照Lab提供的切词规范(determined by unicode.IsLetter)去分词,而不要想当然的用空格作为分隔符。

Part 3: Distributing MapReduce tasks

Part 3着手实现Distributed MapReduce,为了模拟真正的分布式环境,masterworker之间的通信和同步仅通过RPCchannel来实现。Part 3需要我们实现schedule()的调度逻辑。

  • Distributed MapReduce的实现逻辑在上文已有所提及,Sequential MapReduce顺序调度每个task执行,而在Distributed MapReduce中,schedule()task调度到当前可用的worker上去执行。
  • 只有当map/reduce阶段所有的task全部完成后,schedule()才能返回。这一点在代码中我是通过sync.WaitGroup来实现。
  • 在分布式环境下,schedule()通过RPC调度worker执行task,同时输入文件的信息也是作为RPC参数传递。
  • 当前可用的worker存储于masterregisterChannel中。如果当前没有可用的worker,那么schedule()会进入阻塞状态,直至出现新的可用的worker。只有当存在可用的worker时才允许创建goroutine执行task(这一点体现在schedule.goline 38line 42)。

Part 4: Handling worker failures

Part 4需要解决worker failure,映射到具体的代码实现上,work failure是指在进行RPC时,RPC服务器返回了call failed信息。根据论文,我们只需要将task重新分配给另一个可用的worker,并将上次RPC失败的输入文件信息作为本次RPC的参数传递。

需要注意的是,RPC失败并不一定意味着worker崩溃了,也有可能是因为网络原因导致worker不可达,所以有可能导致多个worker在执行着相同的task。不过由于task的幂等性,这并没有什么问题,master记录着每个task对应的真正有效的worker

Implementation

Lab 1的代码实现在这里