singleflight
提供了一套函数重复调用时的抑制机制,经常用来限制并发访问下的重复请求。
例如一个比较常见的场景是使用 singleflight
来限制同一个缓存 key
的重复请求,避免发生 缓存击穿
时,避免请求全部落到数据库,减少性能影响和宕机风险。
参考 扩展阅读 列表的文章。
我们来探究一下 golang.org/x/sync/singleflight
的内部实现,笔者的 Go 版本为 go1.19 linux/amd64
。注意:这个包属于扩展包,标准库的包是 internal/singleflight/singleflight.go
,因为笔者项目中用到的是扩展包,所以这里以扩展包实现为准,
感兴趣的读者可以研究一下标准库中的包,两者大同小异。
errGoexit
错误表明用户函数执行过程中,调用了 runtime.Goexit()
, runtime.Goexit() 会直接终止程序,并在终止前执行所有 defer
.
var errGoexit = errors.New("runtime.Goexit was called")
panicError
对象实现了标准库内置的错误接口,表示在执行给定函数期间,调用 stack trace
从抛出的 panic
中捕获 (recover) 到了具体的值。
type panicError struct {
value interface{}
stack []byte
}
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack() // stack trace 返回的结果中,第一行的格式为 "goroutine N [status]:"
// 但是发生 panic 时,对应的 goroutine 可能已经不存在了,并且状态已经发生改变 (这时需要删除第一行,避免语义误导性)
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
call
对象表示正在执行中或已经执行完的 Do 方法的调用信息,这是一个抽象的数据集合,字段涵盖了调用周期内需要用到的数据。
type call struct {
wg sync.WaitGroup // 这俩字段在调用 WaitGroup 之前写入一次
// 并且只能在 WaitGroup 完成后读取
val interface{}
err error
// forgotten 表明在 call 执行过程中,调用方 (并发的 goroutine) 是否调用了 Forget 方法
// 简单来说就是,同一个 key 在获取过程中并发调用了 Forget 方法
forgotten bool
// 返回值被其他 goroutine 复用的次数
dups int
// 等待复用结果的 goroutine 列表
// goroutine 通过 Result channel 等待获取数据
chans []chan<- Result
}
Group
表示一种类型的工作并形成一个命名空间,在该命名空间中可以抑制重复执行的工作单元,简单来说,就是一个逻辑分组,通常根据业务场景来划分,比如 CMS 系统中的首页、专栏、内容页都可以作为单独的分组。
type Group struct {
// 并发调用时保护 m 字段的互斥锁
mu sync.Mutex
// key 和回调方法的关系映射
// 采用懒加载初始化
m map[string]*call
}
Result
对象表示 Do 方法返回的结果,可以在 channel
中传递 (一般是一个等待获取数据的 goroutine)。
type Result struct {
// 返回值支支持各种数据类型
Val interface{}
// 返回错误值
Err error
// 返回值是否被其他 goroutine 复用
Shared bool
}
Do
方法执行给定的回调函数并返回结果,内部确保一个给定的 key
在同一时间 (多个 goroutine
调用时) 只执行一次,如果出现重复调用,则重复调用方等待原始(第一个)调用完成后,接收并复用相同的结果。第二个返回值 shared
表示是否将返回值 v 赋值给了多个调用者 (返回值是否被复用)。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 操作加锁
g.mu.Lock()
if g.m == nil {
// 懒加载
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 检测 key 是否存在
// 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里
c.dups++ // 复用数量+1
g.mu.Unlock() // 释放锁
c.wg.Wait() // 等待 goroutine 执行完(就是第 1 个抢到锁的 goroutine)
if e, ok := c.err.(*panicError); ok { // 回调函数抛出了 panic error
panic(e)
} else if c.err == errGoexit { // 回调函数调用了 runtime.Goexit
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call) // 如果没有对应的 key,创建一个新的结果集
c.wg.Add(1) // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
g.m[key] = c // 创建一个新的 key
g.mu.Unlock() // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程 g.doCall(c, key, fn) // 调用 doCall() 执行 fn 函数
return c.val, c.err, c.dups > 0
}
DoChan
方法和 Do
方法功能一样 (同步和异步的区别),但是返回一个只读 channel
, 当 channel 准备好时,就开始接收数据,返回的 channel 不能关闭。此外,调用方需要根据返回值中的 Err
字段来处理可能发生的错误。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
// 初始化返回值 channel
ch := make(chan Result, 1)
// 操作加锁
g.mu.Lock()
if g.m == nil {
// 懒加载
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 检测 key 是否存在
// 除了第 1 个抢到锁的 goroutine,其他的 goroutine 都会执行到这里 c.dups++ // 复用数量+1
c.chans = append(c.chans, ch) // 追加到等待复用结果的 goroutine 列表
g.mu.Unlock() // 释放锁
return ch
}
c := &call{chans: []chan<- Result{ch}} // 如果没有对应的 key,创建一个新的列表
c.wg.Add(1) // 只有第 1 个抢到锁的 goroutine 可以调用 Add 方法,其他的 goroutine 进入 Wait 等待
g.m[key] = c // 创建一个新的 key
g.mu.Unlock() // 创建新 key 后,就可以解锁了,其他 goroutine 获得锁之后会进入上面的 if 流程
go g.doCall(c, key, fn) // 调用 doCall() 执行 fn 函数
return ch
}
doCall
方法负责调用 key
对应的回调方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false // 使用两个 defer 区分 panic 和 runtime.Goexit
// 避免了回调函数导致的死锁 (也就是下面的 c.wg.Done() 得不到执行)
// 详情见: https://golang.org/cl/134395
defer func() {
if !normalReturn && !recovered {
// 说明回调函数内部执行了 runtime.Goexit
c.err = errGoexit
}
c.wg.Done() // 如果这里得不到执行,那么调用方的 c.wg.Wait() 就会永久阻塞
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten { // 如果已经删除过该 key,就不需要重复删除了
delete(g.m, key)
}
// 这里的 panic 都是为了第一个抢到的锁的 goroutine,因为后续的 goroutine 不会执行到 doCall() 方法
if e, ok := c.err.(*panicError); ok {
// 为了防止等待的 channels 被永久阻塞,需要确保这种 panic 无法恢复
if len(c.chans) > 0 {
go panic(e)
select {} // 保留这个 goroutine 的调用堆栈,这样它就会出现在 crash dump(这种小技巧值得学习)
} else {
panic(e)
}
} else if c.err == errGoexit {
// goexit 正在处理,不需要重复调用
} else {
// 正常返回, 依次向等待的 goroutine 发送数据
for _, ch := range c.chans { // c.chans 是切片
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 使用一个匿名函数来执行回调函数
func() {
defer func() {
// 如果发生 panic,就创建一个 panic error,由调用方处理
if !normalReturn {
// 理想情况下,我们应该根据 stack trace 返回的结果, 确定这是一个 panic 还是一个 runtime.Goexit
// 能够区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止
// 如果 recover 捕获到了错误,说明是 panic
// 如果 recover 没有捕获到错误,说明是 runtime.Goexit
// 但是当我们知道这一点时,与 panic 相关的 stack trace 信息已经被丢弃了
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
// 如果代码执行到这里,说明调用回调函数没有发生 panic
// 所以可以将变量 normalReturn 设置为 true
normalReturn = true
}()
// 如果 normalReturn != true, 说明调用回调函数发生了 panic
// 同时也说明了,发生的 panic 被捕获 (recover) 到了,而不是直接被 runtime.Goexit 终止程序
// 如果直接被 runtime.Goexit 终止程序,代码就执行不到这里了,而是会直接去执行 defer
if !normalReturn {
recovered = true
}
}
Forget
方法用于删除指定的 key
, 后续对该 key 调用 Do 方法将执行回调函数,不再复用之前的返回值结果。
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
从应用层面来说,singleflight
是将 多个请求的返回结果相同 + 计算过程次数限制 这一过程抽象出来,将所有细节隐藏在内部实现中,只提供 GET
和 DELETE
两种类型 API, 简化了调用方法。
需要注意的有两点:一是 GROUP
组的划分,通常根据业务模块划分即可,类似缓存的 key
前缀,二是 key
的内存占用,key
没有过期机制,
对应的数据会一直占用内存,对于热门数据没有任何问题,但是对于非热门数据,会增加内存的占用,可以根据数据的大致有效期设计 延迟删除
方案。
从内部实现来说,singleflight
的代码非常简洁,尤其是 3 个 Do*
方法之间的紧密配合,可以让我们学习到很多 channel
使用技巧,
还有 doCall
方法中的 双重 defer 机制、判断 panic 和 runtime.Goexit,都是非常实用的 Go code style
代码。
推荐阅读