6 824 Lab 1 Mapreduce
开始学习大名鼎鼎的MIT 6.824: Distributed Systems课程,我跟的是2016
年的课程,课程的主要内容是读Paper
和做Lab
,使用的语言为Go
。五一假期期间我基本做完了Lab 1,感觉难度还是相当大的。本篇文章是我对Lab 1
的一个总结。
MapReduce
每次读MapReduce论文,都会有新的收获,也自知还有理解不到位的地方。
Execution Overview
- 输入数据被划分为
M
个分片,由map worker
产生的中间key-value pairs
被划分为R
个分片。其中M
的大小取决于GFS
块的大小,R
取决于reduce worker
的个数。所以,整个MapReduce Job
包括M
个map task
和R
个reduce task
。 - 在
map
阶段,map worker
把输出的中间key-value pairs
分割成R
个region
,注意pairs
首先会存储在缓冲区中,然后定期的写入本地磁盘。pairs
在map worker
上的位置信息会发送给master
,由master
通知reduce worker
数据读取位置信息。 - 在
reduce
阶段,当reduce worker
获取了输入数据的位置信息后,通过RPC
读取数据。当reduce worker
获取了所有的相关数据之后,会对它们进行一次排序,排序的目的在于不同key
的pairs
会汇聚到同一个reduce worker
。 - 当所有的
task
完成后,一共会产生R
个输出文件,每个reduce worker
对应一个。一般来说没有必要将这R
个文件合并成一个文件,因为这R
个文件往往会作为下一个MapReduce Job
的输入数据。
Master Data Structures
master
会记录所有map task
和reduce task
的状态(idle
,in-progress
和completed
)信息。同时,master
还会保存那些处于non-idle
状态的task
所关联的worker
的信息。- 对于每个状态为
completed
的map task
,master
会保存该map task
对应的R
个region
的位置信息和大小信息。
Fault Tolerance
Worker Failure
master
通过周期性的ping
所有的worker
来确认worker
是否可用,如果某个worker
崩溃了,那么在该worker
上完成的所有map task
都会回退到idle
状态,因此这些task
会被重新调度到可用的worker
上。同理,如果处于in-progress
的map task
或reduce task
所在的worker
崩溃了,那么这些task
也会被重新调度到可用的worker
上重新执行。- 如果某个
worker
崩溃了,其上处于completed
状态的map task
需要重新被调度执行,这是因为map task
的输出数据是存储于local disk
。相反,如果某个worker
崩溃了,其上处于completed
状态的reduce task
不需要重新被调度执行,因为reduce task
的输出数据是存储在global file system
上的。 - 当
reduce worker
在执行reduce task
时,如果某个map task
对应的worker
由worker A
切换到了worker B
(可能是因为worker A
崩溃了),那么master
会通知该reduce worker
从worker B
读取数据。
Master Failure
master
通过定期的做checkpoints
来保证master
的容错性。- 由于
master
只有一个,所以可以认为master
出故障的概率很小。如果真的出故障了,那么根据需要重新执行MapReduce Job
。
Lab
Lab 1
包括4
个Part
,Part 1 & Part 2
实现Sequential MapReduce
,Part 3 & Part 4
实现Distributed MapReduce
,并且要解决worker failure
。同时,Lab 1
提供了MapReduce
的整个框架,并实现了与核心内容无关的代码。在完成各个Part
之前,我们需要理解Lab 1
实现的两个版本的MapReduce
框架的设计思路。
master
和worker
以goroutine
的形式存在,当worker
可用时,以RPC
的方式向master
注册,master
通过调用schedule()
来实现task
的调度。同时,在Distributed MapReduce
中schedule()
还需要处理worker failure
。schedule()
将会以参数的形式存在于Sequential/Distributed MapReduce
。- 论文中
map task
以块作为分割,而Lab
中的map task
将以文件作为分割。 - 在
map
阶段,对于每个map task
(一个输入文件),master
会至少调用一次doMap()
。同理在reduce
阶段,master
对每个reduce task
至少调用一次doReduce()
。在Sequential MapReduce
中,Sequential()
[master.go
]的核心参数schedule()
遍历了file slice
,根据当前所处的phase
对每个task
调用doMap()
或doReduce()
。在Distributed MapReduce
中,Distributed()
[master.go
]的核心参数schedule()
需要我们去实现(Part 3 & Part 4
)。 - 在完成了所有的
map task
和reduce task
之后,master
会调用merge()
对reduce worker
的输出文件进行合并。 - 最后
master
向每个worker
发送Shutdown RPC
,然后关闭整个应用。
理解完整个Lab
的框架,下面谈谈每个Part
需要注意的地方。
Part 1: Map/Reduce input and output
Part 1
主要的内容是完成doMap()
和doReduce()
,在编码前一定要仔细阅读注释给我们提供的信息。
doMap()
读取输入文件(inFile
)的内容,通过mapF
将其转化为[]KeyValue
。对于每个inFile
,doMap
将产生nReduce
(对应于论文中的R
)个输出文件。同时,数据以JSON
的格式存储至输出文件中。doReduce()
从nMap
(对应于论文中的M
)个map worker
读取输入数据([]KeyValue
),注意输入数据的格式为JSON
。根据key
构建出map[key]values
数据结构,对每个key
(string
)和对应的values
([]string
)调用reduceF()
。doReduce()
产生一个输出文件。
Part 2: Single-worker word count
Part 2
实现经典的WordCount
,需要我们实现mapF
和reduceF
中的逻辑。需要注意的是要按照Lab
提供的切词规范(determined by unicode.IsLetter
)去分词,而不要想当然的用空格作为分隔符。
Part 3: Distributing MapReduce tasks
Part 3
着手实现Distributed MapReduce
,为了模拟真正的分布式环境,master
和worker
之间的通信和同步仅通过RPC
和channel
来实现。Part 3
需要我们实现schedule()
的调度逻辑。
Distributed MapReduce
的实现逻辑在上文已有所提及,Sequential MapReduce
顺序调度每个task
执行,而在Distributed MapReduce
中,schedule()
将task
调度到当前可用的worker
上去执行。- 只有当
map/reduce
阶段所有的task
全部完成后,schedule()
才能返回。这一点在代码中我是通过sync.WaitGroup
来实现。 - 在分布式环境下,
schedule()
通过RPC
调度worker
执行task
,同时输入文件的信息也是作为RPC
参数传递。 - 当前可用的
worker
存储于master
的registerChannel
中。如果当前没有可用的worker
,那么schedule()
会进入阻塞状态,直至出现新的可用的worker
。只有当存在可用的worker
时才允许创建goroutine
执行task
(这一点体现在schedule.go
的line 38
和line 42
)。
Part 4: Handling worker failures
Part 4
需要解决worker failure
,映射到具体的代码实现上,work failure
是指在进行RPC
时,RPC
服务器返回了call failed
信息。根据论文,我们只需要将task
重新分配给另一个可用的worker
,并将上次RPC
失败的输入文件信息作为本次RPC
的参数传递。
需要注意的是,RPC
失败并不一定意味着worker
崩溃了,也有可能是因为网络原因导致worker
不可达,所以有可能导致多个worker
在执行着相同的task
。不过由于task
的幂等性,这并没有什么问题,master
记录着每个task
对应的真正有效的worker
。
Implementation
Lab 1
的代码实现在这里。