6.824分布式lab1准备工作
go race检测
例如一个全局变量,一个项目下来,可能完全不知道是不是引起了多个协程或者线程竞争。
demo:
func main() {
a := 1
go func() {
a = 2
}()
a = 3
fmt.Printf("a: %v\n", a)
time.Sleep(time.Second * 3)
// a: 3, 并没有发现或者提示go func修改了a的值
}
加了-race
之后:
$ go run -race main.go
a: 3
==================
WARNING: DATA RACE
Write at 0x00c0000180a8 by goroutine 7:
main.main.func1()
/home/hqinglau/6.824/src/run/main.go:11 +0x30
Previous write at 0x00c0000180a8 by main goroutine:
main.main()
/home/hqinglau/6.824/src/run/main.go:13 +0xb8
Goroutine 7 (running) created at:
main.main()
/home/hqinglau/6.824/src/run/main.go:10 +0xae
==================
Found 1 data race(s)
exit status 66
以便在测试环境发现问题。
go Plugin
go 1.8出的一个插件功能。
demo:
package main
// pk必须是main
import "fmt"
// 插件模块加载的时候自动初始化
func init() {
fmt.Println("hi")
}
// 调用的
func Hello() {
fmt.Println("hello")
}
之后编译:
go build --buildmode=plugin testplugin.go
生成了.so
文件:
可以控制插件的版本:
go build -o testplugin_v1.so -buildmode=plugin testplugin.go
使用:
package main
import (
"plugin"
)
func main() {
p, err := plugin.Open("testplugin.so")
if err != nil {
panic(err)
}
s, err2 := p.Lookup("Hello")
if err2 != nil {
panic(err2)
}
s.(func())()
// $ go run main.go
// hi
// hello
}
在执行函数之前,执行了init
代码。
lab1准备工作
排序:自带串行化代码。首先wc.go
生成wc.so
文件,然后运行mrsequential.go
。
$ go build -race -buildmode=plugin ../mrapps/wc.go
$ go run -race mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
ACTRESS 1
ACTUAL 8
ADLER 1
ADVENTURE 12
ADVENTURES 7
AFTER 2
AGREE 16
AGREEMENT 8
AK 8
ALABAMA 1
AN 1
AND 46
ANY 24
ANYTHING 8
ARAT 1
ARTHUR 1
AS 10
ASCII 17
ASHPUTTEL 2
AT 2
wc.go
单词计数用的插件。
package main
//
// a word-count application "plugin" for MapReduce.
//
// go build -buildmode=plugin wc.go
//
import "6.824/mr"
import "unicode"
import "strings"
import "strconv"
// 每个文件调用一次map函数,第一个参数是文件名,第二个参数是全部内容。
// 文件名暂时用不到,忽略。返回值是键值对切片。
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
//type KeyValue struct {
// Key string
// Value string
//}
//
// map产生的每个键,调用一次reduce, values是这个键的所有值
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
至此,只是两个函数,具体的调用过程要看mrsequential.go
。
mrsequential.go
package main
//
// simple sequential MapReduce.
//
// go run mrsequential.go wc.so pg*.txt
//
import "fmt"
import "6.824/mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"
// for sorting by key.
type ByKey []mr.KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func main() {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
os.Exit(1)
}
// 得到我们插件里定义的map和reduce函数,实现动态加载
mapf, reducef := loadPlugin(os.Args[1])
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
// 读取每一个文件,内容放入mapf里面,也就是map函数里面,得到【单词:1】
// 这样的序列,都放入一个intermediate切片中
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//
// 根据单词进行排序
sort.Sort(ByKey(intermediate))
oname := "mr-out-0"
ofile, _ := os.Create(oname)
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// 排序后查找,遍历一遍,计数,都放入values里面了,其实这里
// 实现的话都是1,所以reduce查看len就可以了
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 把这些属于同一个单词的计数放入reduce函数,得到len。
output := reducef(intermediate[i].Key, values)
// 保存到文件
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
ofile.Close()
}
// 加载插件并验证函数的存在,返回函数类型
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)
return mapf, reducef
}
按着代码理一遍还是比较清晰的。问题:
go run -race mrsequential.go wc.so pg*.txt
有个通配符啊,咋读的?
答:命令行的通配符机制,不需要做额外的处理,具体咋做看例子。
func main() {
// 测试的话
// $ go build main.go
// $ ./main t*
for _, v := range os.Args[0] {
fmt.Println(v)
}
// 46
// 47
// 109
// 97
// 105
// 110
for _, v := range os.Args[0:] {
fmt.Println(v)
}
// testplugin.go
// testplugin.so
}
总结
OK,到此为止,计数就比较清晰了。目前是串行,下一步就是并行化。