go的singleFlight学习
Package singleflight provides a duplicate function call suppression mechanism
“golang.org/x/sync/singleflight”
原来底层是 waitGroup,我还以为等待的协程主动让出 cpu 了,没想到 waitGroup.Wait() 阻塞了
doCall 不但返回值是 func 的 val 和 error,而且 doCall 内部也给 chan 写入了一遍
这样外部的同步 Do 和异步 DoChan 都能复用了
当 Do->doCall 执行 fn 发生 panic 时:
对首发请求,直接在 defer 中把 fn 中捕获的 panic 进行回放
对非首发请求,c.wg.Wait() 结束之后,对 c.err 进行断言,判断是否是一个 panic 错误,如是则回放
这样就保证了首发请求和非首发请求都发生了 panic
一个协程对waitGroup进行Add(1)操作后,多个协程都可以监听它的读
package singleflightimport ("bytes""errors""fmt""runtime""runtime/debug""sync"
)// errGoexit indicates the runtime.Goexit was called in
// the user given function.
// 用户给定的函数中,调用了 runtime.Goexit
var errGoexit = errors.New("runtime.Goexit was called")// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
// 执行给定函数期间,panicError 是一个从 panic 中收到的任意值
// 带有栈追踪
type panicError struct {// value 中存储 errorvalue interface{}stack []byte
}// Error implements error interface.
func (p *panicError) Error() string {return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}func (p *panicError) Unwrap() error {err, ok := p.value.(error)if !ok {return nil}return err
}func newPanicError(v interface{}) error {stack := debug.Stack()// The first line of the stack trace is of the form "goroutine N [status]:"// but by the time the panic reaches Do the goroutine may no longer exist// and its status will have changed. Trim out the misleading line.// 去掉误导性的信息// 栈帧第一行,是"goroutine N [status]:"的信息if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {stack = stack[line+1:]}return &panicError{value: v, stack: stack}
}// call is an in-flight or completed singleflight.Do call
type call struct {wg sync.WaitGroup// These fields are written once before the WaitGroup is done// and are only read after the WaitGroup is done.// WaitGroup Done 之前,这两个字段只会被写入一次// WaitGroup Done 之后,才能读取val interface{}err error// These fields are read and written with the singleflight// mutex held before the WaitGroup is done, and are read but// not written after the WaitGroup is done.// WaitGroup Done 之前,singleflight mutex 持有它的期间,这些字段被读取和写入// WaitGroup Done 之后,仅用于读取,不再被写入dups intchans []chan<- Result
}// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
// Group 代表一个 work 类,形成一个 namespace
// 在该命名空间中(in which),可以通过重复抑制(duplicate suppression)来执行工作单元
type Group struct {mu sync.Mutex // protects mm map[string]*call // lazily initialized
}// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {Val interface{}Err errorShared bool
}// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
//
// duplicate caller 会等待在 singleFlight 上,等待最开始的任务执行结束
// 返回的值 shared ,指示是否要将结果共享给其他 caller
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++g.mu.Unlock()// 等待 waitGroup Donec.wg.Wait()// 此时一定是 waitGroup Done 了// 发生了 panic ,不能吞掉panic错误// 发生了 error if e, ok := c.err.(*panicError); ok {panic(e)} else if c.err == errGoexit {// 优雅地退出 goroutine,防止对上游协程产生干扰runtime.Goexit()}// 返回最终结果return c.val, c.err, true}// 第一次进来的时候,执行这里c := new(call)// waitGroup 计数从 0 -> 1c.wg.Add(1)g.m[key] = cg.mu.Unlock()g.doCall(c, key, fn)return c.val, c.err, c.dups > 0
}// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {normalReturn := falserecovered := false// use double-defer to distinguish panic from runtime.Goexit,// more details see https://golang.org/cl/134395// double-defer 以区分panic 和runtime.Goexitdefer func() {// the given function invoked runtime.Goexit// 把当前的堆栈给记录下来// normalReturn=true,正常结束// normalReturn=false && recovered == true,panic,需要外部还原panic的堆栈// normalReturn=false && recovered == false,go协程主动退出,需要制造一个errif !normalReturn && !recovered {c.err = errGoexit}g.mu.Lock()defer g.mu.Unlock()c.wg.Done()if g.m[key] == c {delete(g.m, key)}if e, ok := c.err.(*panicError); ok {// In order to prevent the waiting channels from being blocked forever,// needs to ensure that this panic cannot be recovered.if len(c.chans) > 0 {// goroutine的崩溃不会影响主goroutine或其他goroutine。go panic(e)// 能让panic爆出来select {} // Keep this goroutine around so that it will appear in the crash dump.} else {panic(e)}} else if c.err == errGoexit {// Already in the process of goexit, no need to call again} else {// Normal returnfor _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}}}()func() {defer func() {if !normalReturn {// Ideally, we would wait to take a stack trace until we've determined// whether this is a panic or a runtime.Goexit.// 理想情况下,会等到确定是否是 panic/runtime.Goexit 后才进行堆栈跟踪//// Unfortunately, the only way we can distinguish the two is to see// whether the recover stopped the goroutine from terminating, and by// the time we know that, the part of the stack trace relevant to the// panic has been discarded.// 不幸的是,我们区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止// 而当我们知道这一点时,与 panic 相关的堆栈跟踪部分已被丢弃。// 把 recover 拦住之后,返回一个 error ,然后在外部再进行放大,杀人于无形,让外部不知道singleFlightif r := recover(); r != nil {c.err = newPanicError(r)}}}()c.val, c.err = fn()normalReturn = true}()if !normalReturn {recovered = true}
}// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
// 如果不想等之前的 singleflight 返回,则在 map[string]*call 中删除之前的 key
func (g *Group) Forget(key string) {g.mu.Lock()delete(g.m, key)g.mu.Unlock()
}
时序分析
首发请求,先在Do中制造call,然后 c.wg.Add(1),然后将其放到map中
c.wg.Add(1)
g.m[key] = c
首发结束时,在doCall的defer中,先 c.wg.Done(),然后将任务从map中移除:delete(g.m, key)
c.wg.Done()
if g.m[key] == c {delete(g.m, key)
}
首发请求和首发结束都在锁的操作下执行。
所以抢到锁的时候,要么是首发请求执行请求的开始,要么是首发请求执行请求的结束
附录
一石激起千层浪
sync.WaitGroup 反向运用
func TestDemo333(t *testing.T) {var wg sync.WaitGroupwg.Add(1)for i := 1; i <= 3; i++ {go func(taskID int) {fmt.Println(i, "before wait")wg.Wait()fmt.Println(i, "wait finish")}(i)}time.Sleep(4 * time.Second)fmt.Println("main before send done")wg.Done() // 在协程结束时,调用Done方法fmt.Println("main after send done")select {}
}
debug.Stack()
属于 runtime/debug 包
用于获取当前程序的堆栈跟踪(stack trace),通常用于调试和错误处理
当调用 stack := debug.Stack() 时,实际上是在获取当前程序的堆栈信息,并将其存储在一个字符串类型的变量 stack 中。
这个字符串包含了程序在调用 debug.Stack() 时的调用栈信息,包括函数名、文件名和行号等。
package mainimport ("fmt""runtime/debug"
)func main() {stack := debug.Stack()fmt.Println(string(stack))
}func functionA() {functionB()
}func functionB() {stack := debug.Stack()fmt.Println(string(stack))
}
示例中定义两个函数 functionA 和 functionB。
在 functionB 中,调用了 debug.Stack() 并打印了堆栈信息。
当运行这个程序时,你会看到类似以下的输出:
goroutine 1 [running]:
main.functionB()/path/to/your/project/main.go:14 +0x8e
main.functionA()/path/to/your/project/main.go:7 +0x56
main.main()/path/to/your/project/main.go:22 +0x4a
runtime.main()/usr/local/go/src/runtime/proc.go:225 +0x235
runtime.goexit()/usr/local/go/src/runtime/asm_amd64.s:1571 +0x1
这个输出显示了程序在调用 debug.Stack() 时的堆栈跟踪信息。
这对于调试程序和查找错误非常有用。
runtime.Goexit()
属于 runtime 包,用于退出当前的 goroutine。
当调用 runtime.Goexit() 时,当前正在执行的 goroutine 会立即终止,但不会对其他 goroutine 产生影响。
runtime.Goexit() 的使用场景通常包括:
(1) 优雅地退出 goroutine:
在某些情况下,可能需要在满足特定条件时退出 goroutine,而不是等待它自然完成。
使用 runtime.Goexit() 可以实现这一点。
(2) 避免 panic 引起的异常退出:
如果 goroutine 中发生了 panic,它会向上传播并可能影响到其他 goroutine。
在这种情况下,使用 runtime.Goexit() 可以优雅地退出当前 goroutine
避免 panic 对其他 goroutine 的影响
(3) 控制 goroutine 的生命周期:
在某些复杂的并发场景中,可能需要手动控制 goroutine 的生命周期。
通过在适当的时候调用 runtime.Goexit(),可以实现这一点。
package mainimport ("fmt""runtime""time"
)var someCondition = truefunc main() {go func() {for {fmt.Println("Running...")time.Sleep(1 * time.Second)if someCondition {fmt.Println("Exiting...")runtime.Goexit()}}}()time.Sleep(5 * time.Second)fmt.Println("Main function finished.")
}
在这个示例中,启动了一个 goroutine,它会每隔一秒钟打印 “Running…”。
当 someCondition 为 true 时,goroutine 会打印 “Exiting…” 并调用 runtime.Goexit() 退出。
主函数在等待 5 秒钟后结束。
过度使用 runtime.Goexit() 可能会导致代码难以理解和维护。
在大多数情况下,使用 channel 和其他同步机制来控制 goroutine 的生命周期是更好的选择。
DoChan的学习
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch
}
相关文章:
go的singleFlight学习
Package singleflight provides a duplicate function call suppression mechanism “golang.org/x/sync/singleflight” 原来底层是 waitGroup,我还以为等待的协程主动让出 cpu 了,没想到 waitGroup.Wait() 阻塞了 doCall 不但返回值是 func 的 val 和…...
高电压技术-冲击高压发生器MATLAB仿真
微❤关注“电气仔推送”获得资料(专享优惠) 冲击电压发生器是产生冲击电压波的装置,用于检验电力设备耐受大气过电压和操作过电压的绝缘性能,冲击电压发生器能产生标准雷电冲击电压波形,雷电冲击电压截波,标准操作冲击…...
【STM32】SysTick系统滴答定时器
1.SysTick简介 CM4内核的处理和CM3一样,内部都包含了一个SysTick定时器,SysTick 是一个24 位的倒计数定时器,当计到0 时 ,将 从RELOAD 寄存器中自动重装载定时初值。只要不把它在SysTick 控制及状态寄存器中的使能位清除…...
编码遵循五大设计原则创建出更加健壮、可维护和可扩展的软件系统
一、单一职责原则(SRP) * 定义:一个类应该只有一个引起它变化的原因。 * 解释:意味着一个类应该专注于做一件事情,当需求发生变化时,只影响到一个类。这有助于降低类间的耦合,使得代码更易于理…...
记录一个问题
问题描述 如果一个物料既在A总成零件号下计算为托盘库,在B总成零件号下计算为箱库,则放于箱库。 A中选择排名第21的递补进托盘库。(也需要判断递补的是否在其他总成零件中为箱库,是的话继续递补判断) 解决思路 为了…...
ONLYOFFICE 8.1版本桌面编辑器测评:重塑办公效率的巅峰之作
在数字化办公日益普及的今天,一款高效、便捷且功能强大的桌面编辑器成为了职场人士不可或缺的工具。ONLYOFFICE 8.1版本桌面编辑器凭借其卓越的性能和丰富的功能,成功吸引了众多用户的目光。今天,我们将对ONLYOFFICE 8.1版本桌面编辑器进行全…...
【shell脚本速成】python安装脚本
文章目录 案例需求应用场景解决问题脚本思路案例代码 🌈你好呀!我是 山顶风景独好 🎈欢迎踏入我的博客世界,能与您在此邂逅,真是缘分使然!😊 🌸愿您在此停留的每一刻,都沐…...
Redis报错:MISCONF Redis is configured to save RDB snapshots
错误提示内容: 2024-06-25 16:30:49 : Connection: Redis_Server > [runCommand] PING 2024-06-25 16:30:49 : Connection: Redis_Server > Response received : -MISCONF Redis is configured to save RDB snapshots, but it is currently not able to pers…...
关于使用绿联 USB-A转RJ45 2.5G网卡提速的解决问题
问题 网络下载速率低 网线是七类网线,外接的USB网卡驱动 我的自带网卡是 I219v 在嵌入了2.5G网络后一直无法到达1.5G以上。 平均测速300~500M 解决方案 更新了USB的网卡驱动 禁用了 I219-V的驱动。测速即可 USB驱动下载地址 https://download.csdn.net/downlo…...
Qt: QPushButton 按钮实现 上图标下文字
效果如下: 实现有如下几种方式: 1. 使用 QPushButton 设置 setStyleSheet 例: ui->recorder->setStyleSheet("QPushButton{"\"border: 1px solid #00d2ff; "\"min-height: 60px; "\"col…...
使用阿里云效API操作流水线
使用阿里云效(Alibaba Cloud DevOps)API操作流水线时,需要注意以下几个方面: 认证与授权 确保你已经获取了正确的访问凭证(AccessKey ID 和 AccessKey Secret),并且这些凭证具有足够的权限来执行…...
使用命令行创建uniapp+TS项目,使用vscode编辑器
一:如果没有pnpm,先安装pnpm 二:使用npx工具和degit工具从 GitHub 上的 dcloudio/uni-preset-vue 仓库克隆一个名为 vite-ts 的分支,到项目中. 执行完上面命令后,去manifest.json添加appid(自己微信小程序的Id),也可不执行直接下一步,执行pnpm install ,再执行pnpm:dev:mp-weix…...
ABC355 Bingo2
分析: 找出其中一行或列或任意对角线被全部标记,即可输出回合数,否则输出-1 如果x%n0,行是x/n,列是n 如果x%n!0,行是x/n1,列是x%n 如果行列或行列n1即为对角线。 标记行列对角线…...
Spring+Vue项目部署
目录 一、需要的资源 二、步骤 1.首先要拥有一个服务器 2.项目准备 vue: 打包: 3.服务器装环境 文件上传 设置application.yml覆盖 添加启动和停止脚本 编辑 安装jdk1.8 安装nginx 安装mysql 报错:「ERR」1273-Unknown collation: utf8m…...
【uml期末复习】统一建模语言大纲
前言: 关于uml的期末复习的常考知识点,可能对你们有帮助😉 目录 第一部分 概念与基础 第一章 面向对象技术 第二章 统一软件过程 第三章 UML概述 第四章 用例图 第五章 类图 第六章 对象图 第七章 顺序图 第八章 协作图 第九章 状态…...
Linux高级IO
高级IO 1.五种IO模型1.1 阻塞IO1.2 非阻塞IO1.3 信号驱动IO1.4 多路复用/多路转接IO1.5 异步IO1.6 小结 2.高级IO重要概念3.非阻塞IO3.1 实现函数NoBlock3.2 轮询方式读取标准输入 4.I/O多路转接之select4.1 理解select执行过程4.2 select的特点4.3 select缺点4.4 实现 5.I/O多…...
go-admin-ui开源后台管理系统华为云部署
1.华为云开通8000与9527端口 2.编译 编译成功 3.发布到远程服务器 4.登陆华为云终端 5.安装Nginx 6.查看服务启动状态 7.添加网站 添加与修改配置www-data 改为 www 自定义日志输出格式 添加网站配置文件go_admin_ui.conf 添加如下内容: location 下的root指向网站文件夹 修…...
点云入门知识
点云的处理任务 场景语义分割 物体的三维表达方法(3D representations): 点云:是由物体表面上许多点数据来表征这个物体。最接近原始传感器数据,且具有丰富的几何信息。 Mesh:用三角形面片和正方形面片拼…...
HTML静态网页成品作业(HTML+CSS+JS)——家乡莆田介绍网页(5个页面)
🎉不定期分享源码,关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 🏷️本套采用HTMLCSS,使用Javacsript代码实现图片轮播,共有5个页面。 二、作品…...
#### grpc比http性能高的原因 ####
grpc比http性能高的原因 二进制消息格式:gRPC使用Protobuf(一种有效的二进制消息格式)进行序列化,这种格式在服务器和客户端上的序列化速度非常快,且序列化后的消息体积小,适合带宽有限的场景。 HTTP/2协…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...
Kafka主题运维全指南:从基础配置到故障处理
#作者:张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1:主题删除失败。常见错误2:__consumer_offsets占用太多的磁盘。 主题日常管理 …...
