Go 每日一库之 singleflight 设计与实现
2023-4-20 08:53:38 Author: Go语言中文网(查看原文) 阅读量:19 收藏

概述

singleflight 提供了一套函数重复调用时的抑制机制,经常用来限制并发访问下的重复请求。

例如一个比较常见的场景是使用 singleflight 来限制同一个缓存 key 的重复请求,避免发生 缓存击穿 时,避免请求全部落到数据库,减少性能影响和宕机风险。

singleflight 示意图

示例

参考 扩展阅读 列表的文章。

内部实现

我们来探究一下 golang.org/x/sync/singleflight 的内部实现,笔者的 Go 版本为 go1.19 linux/amd64。注意:这个包属于扩展包,标准库的包是 internal/singleflight/singleflight.go,因为笔者项目中用到的是扩展包,所以这里以扩展包实现为准, 感兴趣的读者可以研究一下标准库中的包,两者大同小异。

UML

singleflight UML

errGoexit

errGoexit 错误表明用户函数执行过程中,调用了 runtime.Goexit(), runtime.Goexit() 会直接终止程序,并在终止前执行所有 defer.

var errGoexit = errors.New("runtime.Goexit was called")

panicError

panicError 对象实现了标准库内置的错误接口,表示在执行给定函数期间,调用 stack trace 从抛出的 panic 中捕获 (recover) 到了具体的值。

type panicError struct {
 value interface{}
 stack []byte
}
 
func (p *panicError) Error() string {
 return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

newPanicError 方法

func newPanicError(v interface{}) error {
 stack := debug.Stack()

 // stack trace 返回的结果中,第一行的格式为 "goroutine N [status]:"
 // 但是发生 panic 时,对应的 goroutine 可能已经不存在了,并且状态已经发生改变 (这时需要删除第一行,避免语义误导性)
 if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  stack = stack[line+1:]
 }
 return &panicError{value: v, stack: stack}
}

call 对象

call 对象表示正在执行中或已经执行完的 Do 方法的调用信息,这是一个抽象的数据集合,字段涵盖了调用周期内需要用到的数据。

type call struct {
 wg sync.WaitGroup

 // 这俩字段在调用 WaitGroup 之前写入一次
 // 并且只能在 WaitGroup 完成后读取
 val interface{}
 err error

 // forgotten 表明在 call 执行过程中,调用方 (并发的 goroutine) 是否调用了 Forget 方法
 // 简单来说就是,同一个 key 在获取过程中并发调用了 Forget 方法
 forgotten bool

    // 返回值被其他 goroutine 复用的次数
 dups  int
 // 等待复用结果的 goroutine 列表
 //     goroutine 通过 Result channel 等待获取数据 
 chans []chan<- Result
}

Group 对象

Group 表示一种类型的工作并形成一个命名空间,在该命名空间中可以抑制重复执行的工作单元,简单来说,就是一个逻辑分组,通常根据业务场景来划分,比如 CMS 系统中的首页、专栏、内容页都可以作为单独的分组。

type Group struct {
 // 并发调用时保护 m 字段的互斥锁
 mu sync.Mutex  
 // key 和回调方法的关系映射
 //     采用懒加载初始化
 m  map[string]*call 
}

Result 对象

Result 对象表示 Do 方法返回的结果,可以在 channel 中传递 (一般是一个等待获取数据的 goroutine)。

type Result struct {
 // 返回值支支持各种数据类型
 Val    interface{}
 // 返回错误值
 Err    error
 // 返回值是否被其他 goroutine 复用
 Shared bool
}

Do 方法

Do 方法执行给定的回调函数并返回结果,内部确保一个给定的 key 在同一时间 (多个 goroutine 调用时) 只执行一次,如果出现重复调用,则重复调用方等待原始(第一个)调用完成后,接收并复用相同的结果。第二个返回值 shared 表示是否将返回值 v 赋值给了多个调用者 (返回值是否被复用)。

func (g *Group) Do(key string, fn func() (interface{}, error)(v interface{}, err error, shared bool) {
 // 操作加锁
 g.mu.Lock()
 if g.m == nil {
        // 懒加载
  g.m = make(map[string]*call)
 }
 
 if c, ok := g.m[key]; ok {  // 检测 key 是否存在
        // 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里
  
  c.dups++        // 复用数量+1
  g.mu.Unlock()   // 释放锁
  c.wg.Wait()     // 等待 goroutine 执行完(就是第 1 个抢到锁的 goroutine)
  
  if e, ok := c.err.(*panicError); ok {   // 回调函数抛出了 panic error
   panic(e)
  } else if c.err == errGoexit {  // 回调函数调用了 runtime.Goexit
   runtime.Goexit()
  }
  return c.val, c.err, true
 }
 
 c := new(call)  // 如果没有对应的 key,创建一个新的结果集
 c.wg.Add(1)     // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
 g.m[key] = c    // 创建一个新的 key
 g.mu.Unlock()   // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程

 g.doCall(c, key, fn)    // 调用 doCall() 执行 fn 函数
 return c.val, c.err, c.dups > 0
}

DoChan 方法

DoChan 方法和 Do 方法功能一样 (同步和异步的区别),但是返回一个只读 channel, 当 channel 准备好时,就开始接收数据,返回的 channel 不能关闭。此外,调用方需要根据返回值中的 Err 字段来处理可能发生的错误。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 // 初始化返回值 channel
 ch := make(chan Result, 1)
 // 操作加锁
 g.mu.Lock()
 if g.m == nil {
        // 懒加载
  g.m = make(map[string]*call)
 }
 
 if c, ok := g.m[key]; ok {  // 检测 key 是否存在
        // 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里

        c.dups++        // 复用数量+1
        c.chans = append(c.chans, ch)   // 追加到等待复用结果的 goroutine 列表
        g.mu.Unlock()   // 释放锁
  return ch
 }
 
 c := &call{chans: []chan<- Result{ch}}  // 如果没有对应的 key,创建一个新的列表
 c.wg.Add(1)     // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
 g.m[key] = c    // 创建一个新的 key
 g.mu.Unlock()   // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程

 go g.doCall(c, key, fn) // 调用 doCall() 执行 fn 函数

 return ch
}

doCall 方法

doCall 方法负责调用 key 对应的回调方法。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 normalReturn := false
 recovered := false

 // 使用两个 defer 区分 panic 和 runtime.Goexit 
 //    避免了回调函数导致的死锁 (也就是下面的 c.wg.Done() 得不到执行)
 // 详情见: https://golang.org/cl/134395
 defer func() {
  if !normalReturn && !recovered {
   // 说明回调函数内部执行了 runtime.Goexit
   c.err = errGoexit
  }

  c.wg.Done() // 如果这里得不到执行,那么调用方的 c.wg.Wait() 就会永久阻塞
  
  g.mu.Lock()
  defer g.mu.Unlock()
  
  if !c.forgotten { // 如果已经删除过该 key,就不需要重复删除了
   delete(g.m, key)
  }

  // 这里的 panic 都是为了第一个抢到的锁的 goroutine,因为后续的 goroutine 不会执行到 doCall() 方法
  if e, ok := c.err.(*panicError); ok {
   // 为了防止等待的 channels 被永久阻塞,需要确保这种 panic 无法恢复
   if len(c.chans) > 0 {
    go panic(e)
    select {} // 保留这个 goroutine 的调用堆栈,这样它就会出现在 crash dump(这种小技巧值得学习)
   } else {
    panic(e)
   }
  } else if c.err == errGoexit {
   // goexit 正在处理,不需要重复调用
  } else {
   // 正常返回, 依次向等待的 goroutine 发送数据
   for _, ch := range c.chans {    // c.chans 是切片
    ch <- Result{c.val, c.err, c.dups > 0}
   }
  }
 }()

 // 使用一个匿名函数来执行回调函数
 func() {
  defer func() {
   // 如果发生 panic,就创建一个 panic error,由调用方处理
   if !normalReturn {
    // 理想情况下,我们应该根据 stack trace 返回的结果, 确定这是一个 panic 还是一个 runtime.Goexit
    // 能够区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止
    //     如果 recover 捕获到了错误,说明是 panic
    //     如果 recover 没有捕获到错误,说明是 runtime.Goexit
    // 但是当我们知道这一点时,与 panic 相关的 stack trace 信息已经被丢弃了
    if r := recover(); r != nil {
     c.err = newPanicError(r)
    }
   }
  }()

  c.val, c.err = fn()
  
  // 如果代码执行到这里,说明调用回调函数没有发生 panic
  // 所以可以将变量 normalReturn 设置为 true
  normalReturn = true
 }()
 
 // 如果 normalReturn != true, 说明调用回调函数发生了 panic
 // 同时也说明了,发生的 panic 被捕获 (recover) 到了,而不是直接被 runtime.Goexit 终止程序
 //    如果直接被 runtime.Goexit 终止程序,代码就执行不到这里了,而是会直接去执行 defer
 if !normalReturn {
  recovered = true
 }
}

Forget 方法

Forget 方法用于删除指定的 key, 后续对该 key 调用 Do 方法将执行回调函数,不再复用之前的返回值结果。

func (g *Group) Forget(key string) {
 g.mu.Lock()
 if c, ok := g.m[key]; ok {
  c.forgotten = true
 }
 delete(g.m, key)
 g.mu.Unlock()
}

小结

从应用层面来说,singleflight 是将 多个请求的返回结果相同 + 计算过程次数限制 这一过程抽象出来,将所有细节隐藏在内部实现中,只提供 GETDELETE 两种类型 API, 简化了调用方法。

需要注意的有两点:一是 GROUP 组的划分,通常根据业务模块划分即可,类似缓存的 key 前缀,二是 key 的内存占用,key 没有过期机制, 对应的数据会一直占用内存,对于热门数据没有任何问题,但是对于非热门数据,会增加内存的占用,可以根据数据的大致有效期设计 延迟删除 方案。

从内部实现来说,singleflight 的代码非常简洁,尤其是 3 个 Do* 方法之间的紧密配合,可以让我们学习到很多 channel 使用技巧, 还有 doCall 方法中的 双重 defer 机制、判断 panic 和 runtime.Goexit,都是非常实用的 Go code style 代码。


推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651454283&idx=1&sn=3c40a247c3eccb5e37e9cc91d01ad920&chksm=80bb25b9b7ccacafe5d1f566a998fed045535f9988efba077d752c777416233fabb0619d1991#rd
如有侵权请联系:admin#unsafe.sh