MIT-6.824 Distributed Systems-LEC 2 RPC and Threads
MIT-6.824(Spring 2022)LEC 2 RPC and Threads
Go快速入门
How do Go channels work? How does Go make sure they are synchronized between the many possible goroutines?
https://golang.org/src/runtime/chan.go
At a high level, a chan is a struct holding a buffer and a lock. Sending on a channel involves acquiring the lock, waiting (perhaps releasing the CPU) until some thread is receiving, and handing off the message. Receiving involves acquiring the lock and waiting for a sender. You could implement your own channels with Go sync.Mutex and sync.Cond.
LEC 2
为什么使用Go?
- 对线程和RPC有很好的支持(更适合分布式编程)
- 垃圾收集器,不需要用户自己释放内存
- 简单易学
- 自带编译器,不是 Python 那样的解释型语言
线程
在一个进程中并行运行多个线程
线程原语:开启线程、退出线程(隐式)、停止线程(挂在一边不懂)、恢复线程
为什么需要线程?
支持并发
- 输入/输出并发
- 多核并行
- 方便(例如定期执行后台活动等)
数量可以不考虑,按照需求创建线程即可
线程编程挑战
- 竞争情况(同时对某一个变量进行写操作)
- 可能大多数情况运行都很好,但是确实在某些条件下得不到想要的结果
- 解决的两种方法
- 避免共享变量(channels)go推荐使用
- 使用锁(mutex)
- 协调问题:一个线程必须等待另一个线程完成后才能继续进行
- channels
- condition variables
- 死锁问题:两边都在等待对方
Go应对挑战的机制
channels和condition variables
- 如果不共享内存,只想让线程互相进行通信,则应该使用channels
- 如果需要共享内存,应该使用锁和condition variables
条件变量和channel实例
分配条件变量并且和锁关联,不满足条件进入睡眠状态,并释放关联的锁。
在goroutine运行的最后唤醒睡眠状态的线程,重新进行判断
package main
import "sync"
import "time"
import "math/rand"
func main() {
rand.Seed(time.Now().UnixNano())
count := 0
finished := 0
var mu sync.Mutex
cond := sync.NewCond(&mu)
for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
cond.Broadcast()
}()
}
mu.Lock()
for count < 5 && finished != 10 {
cond.Wait()
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}
func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int() % 2 == 0
}
package main
import "time"
import "math/rand"
func main() {
rand.Seed(time.Now().UnixNano())
count := 0
ch := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
ch <- requestVote()
}()
}
for i := 0; i < 10; i++ {
v := <-ch
if v {
count += 1
}
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
}
func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int()%2 == 0
}
go tour 爬虫练习
package main
import (
"fmt"
"sync"
)
//
// Several solutions to the crawler exercise from the Go tutorial
// https://tour.golang.org/concurrency/10
//
//
// Serial crawler
//
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}
//
// Concurrent crawler with shared state and Mutex
//
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
already := f.fetched[url]
f.fetched[url] = true
f.mu.Unlock()
if already {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u)
}
done.Wait()
return
}
func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}
//
// Concurrent crawler with channels
//
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}
//
// main
//
func main() {
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))
fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())
fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
}
//
// Fetcher
//
type Fetcher interface {
// Fetch returns a slice of URLs found on the page.
Fetch(url string) (urls []string, err error)
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
RPC-远程过程调用
RPC:在客户端上调用在服务器端实现的函数-传递参数并返回结果
实际过程:
- 在客户端上调用stub过程:构建一个消息,包括调用哪个函数,函数的参数,参数类型等等。
- 通过网络发送给服务器上对应的stub
- 在服务器上调用函数
- 返回给服务器的stub
- 返回给客户端的stub(这个期间一直在等待)
- 返回结果
示例
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)
//
// Common RPC request/reply definitions
//
type PutArgs struct {
Key string
Value string
}
type PutReply struct {
}
type GetArgs struct {
Key string
}
type GetReply struct {
Value string
}
//
// Client
//
func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}
func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}
func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}
//
// Server
//
type KV struct {
mu sync.Mutex
data map[string]string
}
func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}
func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Value = kv.data[args.Key]
return nil
}
func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.data[args.Key] = args.Value
return nil
}
//
// main
//
func main() {
server()
put("subject", "6.824")
fmt.Printf("Put(subject, 6.824) done\n")
fmt.Printf("get(subject) -> %s\n", get("subject"))
}
RPC失败
- 至少一次:失败后(没有接到服务器的响应)会自动重试
- 有可能多次执行
- 最多一次:服务器端实现过滤重复,确保最多只能执行一次(Go的RPC实现)
- 正好一次:很难实现
MIT-6.824 Distributed Systems-LEC 2 RPC and Threads
https://zhangzhao219.github.io/2022/12/15/6.824/Distributed-Systems-MIT-6.824-LEC-2/