Go项目-分布式缓存GeeCache

Go项目-分布式缓存GeeCache

GeeCache

完成的功能

  • 单机缓存和基于 HTTP 的分布式缓存
  • 最近最少访问(Least Recently Used, LRU) 缓存策略
  • 使用 Go 锁机制防止缓存击穿
  • 使用一致性哈希选择节点,实现负载均衡

LRU 缓存淘汰策略

FIFO:先淘汰缓存中最早添加的记录

LFU:淘汰缓存中访问频率最低的记录,需要维护一个访问频率的表

LRU:最近最少使用,认为如果数据最近被访问过,那么将来被访问的概率也会更高。维护一个队列,如果一条记录被访问,则移动到队列尾端,这样保证队首一定是最近最少访问的数据

package lru

import "container/list"

// LRU cache 结构体
type Cache struct {
	maxBytes  int64                         // 允许使用的最大内存
	nbytes    int64                         // 当前已使用的内存
	ll        *list.List                    // cache链表
	cache     map[string]*list.Element      // 查找键值对的字典
	OnEvicted func(key string, value Value) // 某条记录被移除时的回调函数
}

// 双向链表节点的数据类型
// 主要目的是为了删除节点后能从字典中删除该键值对
type entry struct {
	key   string
	value Value
}

// 值的类型可以是任意的,定义一个空接口,实现Len()方法返回值的占用空间大小

// Len the number of cache entries
func (c *Cache) Len() int {
	return c.ll.Len()
}

type Value interface {
	Len() int // 包含一个方法返回值占用的内存大小
}

// 工厂模式,返回实例化的cache
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
	return &Cache{
		maxBytes:  maxBytes,
		ll:        list.New(),
		cache:     make(map[string]*list.Element),
		OnEvicted: onEvicted,
	}
}

// 查找功能,在字典中进行查找,然后移动到队尾(Front)
func (c *Cache) Get(key string) (value Value, ok bool) {
	if ele, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ele)
		kv := ele.Value.(*entry)
		return kv.value, true
	}
	return
}

// LRU删除策略:从队首(Back)拿到节点,然后将其删除
func (c *Cache) RemoveOldest() {
	ele := c.ll.Back()
	if ele != nil {
		c.ll.Remove(ele)
		kv := ele.Value.(*entry)
		delete(c.cache, kv.key)
		c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len()) // 更新当前已经使用的内存
		if c.OnEvicted != nil {
			c.OnEvicted(kv.key, kv.value)
		}
	}
}

// 新增节点/修改节点
func (c *Cache) Add(key string, value Value) {
	// 如果在链表中找到则将其更新,同时更新占用的空间大小等,并移动到队列尾端
	if ele, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ele)
		kv := ele.Value.(*entry)
		c.nbytes += int64(value.Len()) - int64(kv.value.Len())
		kv.value = value
	} else { // 如果找不到则直接插入
		ele := c.ll.PushFront(&entry{key, value})
		c.cache[key] = ele
		c.nbytes += int64(len(key)) + int64(value.Len())
	}
	// 如果占用空间超过了链表的最大空间,则删除掉队首的节点
	for c.maxBytes != 0 && c.maxBytes < c.nbytes {
		c.RemoveOldest()
	}
}

单机并发缓存

多个协程(goroutine)同时读写同一个变量,在并发度较高的情况下,会发生冲突。确保一次只有一个协程(goroutine)可以访问该变量以避免冲突,这称之为 互斥,互斥锁可以解决这个问题。

当一个协程调用了 Lock() 方法时,其他协程被阻塞了,直到 Unlock()调用将锁释放。因此被包裹部分的代码就能够避免冲突,实现互斥。

抽象了一个只读数据结构 ByteView 用来表示缓存值:

package geecache

// 只读数据结构用来表示缓存值
type ByteView struct {
	b []byte
}

// 返回缓存值的长度
func (v ByteView) Len() int {
	return len(v.b)
}

// 返回拷贝从而防止这个值被外部操作修改
func (v ByteView) ByteSlice() []byte {
	return cloneBytes(v.b)
}

// 将数据作为一个字符串进行返回
func (v ByteView) String() string {
	return string(v.b)
}

func cloneBytes(b []byte) []byte {
	c := make([]byte, len(b))
	copy(c, b)
	return c
}

为 lru.Cache 添加并发特性(加锁):

package geecache

import (
	"Go-Projects/GeeCache/lru"
	"sync"
)

type cache struct {
	mu         sync.Mutex
	lru        *lru.Cache
	cacheBytes int64
}

func (c *cache) add(key string, value ByteView) {
	c.mu.Lock()
	defer c.mu.Unlock()
	// 延迟初始化
	if c.lru == nil {
		c.lru = lru.New(c.cacheBytes, nil)
	}
	c.lru.Add(key, value)
}

func (c *cache) get(key string) (value ByteView, ok bool) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.lru == nil {
		return
	}

	if v, ok := c.lru.Get(key); ok {
		return v.(ByteView), ok
	}

	return
}

Group 是 GeeCache 最核心的数据结构,负责与用户的交互,并且控制缓存值存储和获取的流程。

在缓存不存在时,调用这个函数,得到源数据:

type Getter interface {
	Get(key string) ([]byte, error)
}

// 定义函数类型 GetterFunc,并实现 Getter 接口的 Get 方法
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
	return f(key)
}

核心Group:

// 缓存的命名空间
type Group struct {
	name      string // 每个Group拥有一个唯一的名称
	getter    Getter // 缓存未命中时的回溯
	mainCache cache  // 并发缓存
}

var (
	mu     sync.RWMutex
	groups = make(map[string]*Group)
)

// 创建Group实例,并且将group的名称存在全局变量中
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
	if getter == nil {
		panic("nil Getter")
	}
	mu.Lock()
	defer mu.Unlock()
	g := &Group{
		name:      name,
		getter:    getter,
		mainCache: cache{cacheBytes: cacheBytes},
	}
	groups[name] = g
	return g
}

// 获取指定的group
func GetGroup(name string) *Group {
	mu.RLock()
	g := groups[name]
	mu.RUnlock()
	return g
}

Group的Get方法,完成对缓存的查找以及未命中后的回调操作

// 找到缓存值
func (g *Group) Get(key string) (ByteView, error) {
	// 如果没有键则报错
	if key == "" {
		return ByteView{}, fmt.Errorf("key is required")
	}
	// 从 mainCache 中查找缓存,如果存在则返回缓存值
	if v, ok := g.mainCache.get(key); ok {
		log.Println("[GeeCache] hit")
		return v, nil
	}

	return g.load(key)
}

// 缓存不存在,则调用 load 方法
func (g *Group) load(key string) (value ByteView, err error) {
	return g.getLocally(key)
}

// getLocally 调用用户回调函数 g.getter.Get() 获取源数据,并且将源数据添加到缓存 mainCache 中
func (g *Group) getLocally(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err

	}
	value := ByteView{b: cloneBytes(bytes)}
	g.populateCache(key, value)
	return value, nil
}

func (g *Group) populateCache(key string, value ByteView) {
	g.mainCache.add(key, value)
}

HTTP 服务端

分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。

承载节点间 HTTP 通信的核心数据结构:

package geecache

const defaultBasePath = "/_geecache/"

type HTTPPool struct {
	self     string // 记录自己的地址,包括主机名/IP 和端口
	basePath string // 节点间通讯地址的前缀
}

// 返回HTTP实例
func NewHTTPPool(self string) *HTTPPool {
	return &HTTPPool{
		self:     self,
		basePath: defaultBasePath,
	}
}

实现最为核心的 ServeHTTP 方法:

// 使用服务器登录
func (p *HTTPPool) Log(format string, v ...interface{}) {
	log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}

// 处理HTTP请求
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// 判断访问路径的前缀是否是 basePath,不是返回错误
	if !strings.HasPrefix(r.URL.Path, p.basePath) {
		panic("HTTPPool serving unexpected path: " + r.URL.Path)
	}
	p.Log("%s %s", r.Method, r.URL.Path)
	// /<basepath>/<groupname>/<key> required
	parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
	if len(parts) != 2 {
		http.Error(w, "bad request", http.StatusBadRequest)
		return
	}

	groupName := parts[0]
	key := parts[1]
	// 通过 groupname 得到 group 实例,再使用 group.Get(key) 获取缓存数据
	group := GetGroup(groupName)
	if group == nil {
		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
		return
	}

	view, err := group.Get(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/octet-stream")
	// 将缓存值作为 httpResponse 的 body 返回
	w.Write(view.ByteSlice())
}

一致性哈希

一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环

package consistenthash

import (
	"hash/crc32"
	"sort"
	"strconv"
)

// 定义了函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的 Hash 函数
type Hash func(data []byte) uint32

// 一致性哈希算法的主数据结构
type Map struct {
	hash     Hash
	replicas int            // 虚拟节点倍数
	keys     []int          // 哈希环
	hashMap  map[int]string // 虚拟节点与真实节点的映射表
}

// 允许自定义虚拟节点倍数和 Hash 函数
func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

// 实现添加真实节点/机器的 Add() 方法
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}

// 实现选择节点的 Get() 方法
func (m *Map) Get(key string) string {
	if len(m.keys) == 0 {
		return ""
	}

	hash := int(m.hash([]byte(key))) // 计算 key 的哈希值
	// 顺时针找到第一个匹配的虚拟节点的下标 idx
	idx := sort.Search(len(m.keys), func(i int) bool {
		return m.keys[i] >= hash
	})

	return m.hashMap[m.keys[idx%len(m.keys)]]
}

分布式节点

抽象 PeerPicker

package geecache

// PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter
type PeerPicker interface {
	PickPeer(key string) (peer PeerGetter, ok bool)
}

// 接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值
type PeerGetter interface {
	Get(group string, key string) ([]byte, error)
}

节点选择与 HTTP 客户端

const (
	defaultBasePath = "/_geecache/"
	defaultReplicas = 50
)

type HTTPPool struct {
	self        string                 // 记录自己的地址,包括主机名/IP 和端口
	basePath    string                 // 节点间通讯地址的前缀
	mu          sync.Mutex             // 锁
	peers       *consistenthash.Map    // 新增成员变量 peers,类型是一致性哈希算法的 Map,用来根据具体的 key 选择节点
	httpGetters map[string]*httpGetter // 映射远程节点与对应的 httpGetter
}

// 实现 PeerGetter 接口
type httpGetter struct {
	baseURL string
}
// 使用 http.Get() 方式获取返回值,并转换为 []bytes 类型
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

实现 PeerPicker 接口

// Set() 方法实例化了一致性哈希算法,并且添加了传入的节点,为每一个节点创建了一个 HTTP 客户端 httpGetter
func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

// PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

修改主方法

// 将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

// 使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

// 缓存不存在,则调用 load 方法
// 若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()
func (g *Group) load(key string) (value ByteView, err error) {

	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

防止缓存击穿

并发了 N 个请求,假设对数据库的访问没有做任何限制的,很可能向数据库也发起 N 次请求,容易导致缓存击穿和穿透。针对相同的 key,如何做到只向远端节点发起一次请求呢?

实现了一个名为 singleflight 的 package 来解决这个问题

package singleflight

import "sync"

// call 代表正在进行中,或已经结束的请求。使用 sync.WaitGroup 锁避免重入
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

// Group 是 singleflight 的主数据结构,管理不同 key 的请求(call)
type Group struct {
	mu sync.Mutex // protects m
	m  map[string]*call
}

实现 Do 方法

// 针对相同的 key,无论 Do 被调用多少次,函数 fn 都只会被调用一次,等待 fn 调用结束了,返回返回值或错误。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()         // 如果请求正在进行中,则等待
		return c.val, c.err // 请求结束,返回结果
	}
	c := new(call)
	c.wg.Add(1)  // 发起请求前加锁
	g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理
	g.mu.Unlock()

	c.val, c.err = fn() // 调用 fn,发起请求
	c.wg.Done()         // 请求结束

	g.mu.Lock()
	delete(g.m, key) // 更新 g.m
	g.mu.Unlock()

	return c.val, c.err // 返回结果
}

Go项目-分布式缓存GeeCache
https://zhangzhao219.github.io/2022/12/05/Go/Go-Project-Geecache/
作者
Zhang Zhao
发布于
2022年12月5日
许可协议