Go项目-分布式缓存GeeCache
Go项目-分布式缓存GeeCache
GeeCache
完成的功能
- 单机缓存和基于 HTTP 的分布式缓存
- 最近最少访问(Least Recently Used, LRU) 缓存策略
- 使用 Go 锁机制防止缓存击穿
- 使用一致性哈希选择节点,实现负载均衡
LRU 缓存淘汰策略
FIFO:先淘汰缓存中最早添加的记录
LFU:淘汰缓存中访问频率最低的记录,需要维护一个访问频率的表
LRU:最近最少使用,认为如果数据最近被访问过,那么将来被访问的概率也会更高。维护一个队列,如果一条记录被访问,则移动到队列尾端,这样保证队首一定是最近最少访问的数据
package lru
import "container/list"
// LRU cache 结构体
type Cache struct {
maxBytes int64 // 允许使用的最大内存
nbytes int64 // 当前已使用的内存
ll *list.List // cache链表
cache map[string]*list.Element // 查找键值对的字典
OnEvicted func(key string, value Value) // 某条记录被移除时的回调函数
}
// 双向链表节点的数据类型
// 主要目的是为了删除节点后能从字典中删除该键值对
type entry struct {
key string
value Value
}
// 值的类型可以是任意的,定义一个空接口,实现Len()方法返回值的占用空间大小
// Len the number of cache entries
func (c *Cache) Len() int {
return c.ll.Len()
}
type Value interface {
Len() int // 包含一个方法返回值占用的内存大小
}
// 工厂模式,返回实例化的cache
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}
// 查找功能,在字典中进行查找,然后移动到队尾(Front)
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}
// LRU删除策略:从队首(Back)拿到节点,然后将其删除
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry)
delete(c.cache, kv.key)
c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len()) // 更新当前已经使用的内存
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}
// 新增节点/修改节点
func (c *Cache) Add(key string, value Value) {
// 如果在链表中找到则将其更新,同时更新占用的空间大小等,并移动到队列尾端
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nbytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else { // 如果找不到则直接插入
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nbytes += int64(len(key)) + int64(value.Len())
}
// 如果占用空间超过了链表的最大空间,则删除掉队首的节点
for c.maxBytes != 0 && c.maxBytes < c.nbytes {
c.RemoveOldest()
}
}
单机并发缓存
多个协程(goroutine)同时读写同一个变量,在并发度较高的情况下,会发生冲突。确保一次只有一个协程(goroutine)可以访问该变量以避免冲突,这称之为 互斥
,互斥锁可以解决这个问题。
当一个协程调用了 Lock()
方法时,其他协程被阻塞了,直到 Unlock()
调用将锁释放。因此被包裹部分的代码就能够避免冲突,实现互斥。
抽象了一个只读数据结构 ByteView
用来表示缓存值:
package geecache
// 只读数据结构用来表示缓存值
type ByteView struct {
b []byte
}
// 返回缓存值的长度
func (v ByteView) Len() int {
return len(v.b)
}
// 返回拷贝从而防止这个值被外部操作修改
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}
// 将数据作为一个字符串进行返回
func (v ByteView) String() string {
return string(v.b)
}
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}
为 lru.Cache 添加并发特性(加锁):
package geecache
import (
"Go-Projects/GeeCache/lru"
"sync"
)
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
// 延迟初始化
if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}
return
}
Group 是 GeeCache 最核心的数据结构,负责与用户的交互,并且控制缓存值存储和获取的流程。
在缓存不存在时,调用这个函数,得到源数据:
type Getter interface {
Get(key string) ([]byte, error)
}
// 定义函数类型 GetterFunc,并实现 Getter 接口的 Get 方法
type GetterFunc func(key string) ([]byte, error)
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
核心Group:
// 缓存的命名空间
type Group struct {
name string // 每个Group拥有一个唯一的名称
getter Getter // 缓存未命中时的回溯
mainCache cache // 并发缓存
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
)
// 创建Group实例,并且将group的名称存在全局变量中
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}
// 获取指定的group
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
Group的Get方法,完成对缓存的查找以及未命中后的回调操作
// 找到缓存值
func (g *Group) Get(key string) (ByteView, error) {
// 如果没有键则报错
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
// 从 mainCache 中查找缓存,如果存在则返回缓存值
if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}
return g.load(key)
}
// 缓存不存在,则调用 load 方法
func (g *Group) load(key string) (value ByteView, err error) {
return g.getLocally(key)
}
// getLocally 调用用户回调函数 g.getter.Get() 获取源数据,并且将源数据添加到缓存 mainCache 中
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
HTTP 服务端
分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。
承载节点间 HTTP 通信的核心数据结构:
package geecache
const defaultBasePath = "/_geecache/"
type HTTPPool struct {
self string // 记录自己的地址,包括主机名/IP 和端口
basePath string // 节点间通讯地址的前缀
}
// 返回HTTP实例
func NewHTTPPool(self string) *HTTPPool {
return &HTTPPool{
self: self,
basePath: defaultBasePath,
}
}
实现最为核心的 ServeHTTP
方法:
// 使用服务器登录
func (p *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}
// 处理HTTP请求
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 判断访问路径的前缀是否是 basePath,不是返回错误
if !strings.HasPrefix(r.URL.Path, p.basePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
p.Log("%s %s", r.Method, r.URL.Path)
// /<basepath>/<groupname>/<key> required
parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
// 通过 groupname 得到 group 实例,再使用 group.Get(key) 获取缓存数据
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
// 将缓存值作为 httpResponse 的 body 返回
w.Write(view.ByteSlice())
}
一致性哈希
一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
// 定义了函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的 Hash 函数
type Hash func(data []byte) uint32
// 一致性哈希算法的主数据结构
type Map struct {
hash Hash
replicas int // 虚拟节点倍数
keys []int // 哈希环
hashMap map[int]string // 虚拟节点与真实节点的映射表
}
// 允许自定义虚拟节点倍数和 Hash 函数
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// 实现添加真实节点/机器的 Add() 方法
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// 实现选择节点的 Get() 方法
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}
hash := int(m.hash([]byte(key))) // 计算 key 的哈希值
// 顺时针找到第一个匹配的虚拟节点的下标 idx
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
return m.hashMap[m.keys[idx%len(m.keys)]]
}
分布式节点
抽象 PeerPicker
package geecache
// PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool)
}
// 接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值
type PeerGetter interface {
Get(group string, key string) ([]byte, error)
}
节点选择与 HTTP 客户端
const (
defaultBasePath = "/_geecache/"
defaultReplicas = 50
)
type HTTPPool struct {
self string // 记录自己的地址,包括主机名/IP 和端口
basePath string // 节点间通讯地址的前缀
mu sync.Mutex // 锁
peers *consistenthash.Map // 新增成员变量 peers,类型是一致性哈希算法的 Map,用来根据具体的 key 选择节点
httpGetters map[string]*httpGetter // 映射远程节点与对应的 httpGetter
}
// 实现 PeerGetter 接口
type httpGetter struct {
baseURL string
}
// 使用 http.Get() 方式获取返回值,并转换为 []bytes 类型
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
实现 PeerPicker 接口
// Set() 方法实例化了一致性哈希算法,并且添加了传入的节点,为每一个节点创建了一个 HTTP 客户端 httpGetter
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
// PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
修改主方法
// 将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
// 使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
// 缓存不存在,则调用 load 方法
// 若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()
func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
防止缓存击穿
并发了 N 个请求,假设对数据库的访问没有做任何限制的,很可能向数据库也发起 N 次请求,容易导致缓存击穿和穿透。针对相同的 key,如何做到只向远端节点发起一次请求呢?
实现了一个名为 singleflight 的 package 来解决这个问题
package singleflight
import "sync"
// call 代表正在进行中,或已经结束的请求。使用 sync.WaitGroup 锁避免重入
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group 是 singleflight 的主数据结构,管理不同 key 的请求(call)
type Group struct {
mu sync.Mutex // protects m
m map[string]*call
}
实现 Do
方法
// 针对相同的 key,无论 Do 被调用多少次,函数 fn 都只会被调用一次,等待 fn 调用结束了,返回返回值或错误。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait() // 如果请求正在进行中,则等待
return c.val, c.err // 请求结束,返回结果
}
c := new(call)
c.wg.Add(1) // 发起请求前加锁
g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理
g.mu.Unlock()
c.val, c.err = fn() // 调用 fn,发起请求
c.wg.Done() // 请求结束
g.mu.Lock()
delete(g.m, key) // 更新 g.m
g.mu.Unlock()
return c.val, c.err // 返回结果
}