详解Go语言中的Goroutine组(Group)在项目中的使用
背景(Why)
Go语言通过其内置的goroutine和通道(channel)机制,提供了强大的并发支持。goroutine的开销非常低,一个goroutine仅占用几KB的内存,可以轻松创建成千上万个goroutine来处理并发任务。然而,随着并发任务数量的增加,管理goroutine的生命周期、处理错误以及保证资源正确回收变得越来越复杂。例如,我们需要处理以下场景:
- 错误处理困难:如果某个goroutine发生错误或panic,需要有机制捕获这些错误并作出相应处理。
- 资源管理复杂:确保所有goroutine在完成任务后正确回收资源,防止资源泄漏。
- 任务调度不灵活:在多个goroutine之间调度任务,确保高效执行和公平分配。在goroutine执行前后进行必要的操作,如日志记录或环境准备。
- 同步复杂性:确保所有goroutine都在某个时间点前完成,或者在发生重大错误时取消所有未完成的goroutine。
为了解决这些问题,引入了一个Group结构体,提供了一种更高级的方式来管理一组goroutine。
What
定义一个Group结构体来实现goroutine组管理
type Group struct {chs []func(ctx context.Context) error // 保存所有要在组中执行的任务name string // 组名err error // 保存组中发生的第一个错误ctx context.Context // 组的上下文,用于控制任务的执行panicCb func([]byte) bool // 在发生 panic 时调用的回调函数beforeCb func() // 在任务执行之前调用的回调函数panicTimeout time.Duration // 调用 panicCb 之间的时间间隔ch chan func(ctx context.Context) error // 任务通道cancel func() // 取消任务的函数wg sync.WaitGroup // 等待组内所有任务完成errOnce sync.Once // 确保 err 只被设置一次workerOnce sync.Once // 确保 worker 只被启动一次panicTimes int8 // 最大允许 panic 的次数
}
chs []func(ctx context.Context) error:
- 类型:切片,包含多个函数,这些函数接受
context.Context作为参数并返回错误。- 用途:保存所有要在组中执行的任务。
name string:
- 类型:字符串。
- 用途:保存组的名称。
err error:
- 类型:错误。
- 用途:保存组中第一个发生的错误。
ctx context.Context:
- 类型:上下文。
- 用途:控制任务的执行,可以用来取消任务或者设置任务的超时时间。
panicCb func([]byte) bool:
- 类型:函数,接受一个字节切片参数(panic 的堆栈信息)并返回布尔值。
- 用途:当组中的任务发生 panic 时调用的回调函数。
beforeCb func():
- 类型:函数,无参数无返回值。
- 用途:在每个任务执行之前调用的回调函数。
panicTimeout time.Duration:
- 类型:持续时间。
- 用途:两次调用
panicCb之间的时间间隔。如果某个任务频繁地发生panic,而每次panic都调用panicCb,这可能会导致系统性能下降或产生大量日志。通过设置panicTimeout,可以限制panicCb的调用频率,确保在一个指定的时间间隔内不会多次调用panicCb。
ch chan func(ctx context.Context) error:
- 类型:通道,包含函数,这些函数接受
context.Context作为参数并返回错误。- 用途:用于在组内传递任务。
cancel func():
- 类型:函数,无参数无返回值。
- 用途:用于取消组内的所有任务。
wg sync.WaitGroup:
- 类型:等待组。
- 用途:用于等待组内所有任务完成。
errOnce sync.Once:
- 类型:同步 Once。
- 用途:确保
err只被设置一次。
workerOnce sync.Once:
- 类型:同步 Once。
- 用途:确保 worker 只被启动一次。
panicTimes int8:
- 类型:整数(8位)。
- 用途:最大允许的 panic 次数。
创建NewGroup函数
NewGroup函数用于创建一个新的goroutine组实例,初始化相关参数,并设置panic处理回调函数。
func NewGroup(option Option) *Group {log = logger.SLogger("goroutine")name := "default"if len(option.Name) > 0 {name = option.Name}g := &Group{name: name,panicCb: option.PanicCb,panicTimes: option.PanicTimes,panicTimeout: option.PanicTimeout,}//如果 option 中未提供 panicCb,则使用默认的 panicCb 回调函数。这个函数会记录 panic 的信息,并增加 goroutineCrashedVec 指标。if g.panicCb == nil {g.panicCb = func(crashStack []byte) bool {log.Errorf("recover panic: %s", string(crashStack))goroutineCrashedVec.WithLabelValues(name).Inc()return true}}goroutineGroups.Inc()return g
}
3.定义GOMAXPROCS 方法
GOMAXPROCS 函数用于设置并发执行的最大 goroutine 数量。具体来说,它通过创建一个缓冲通道来限制并发执行的 goroutine 数量,并启动相应数量的 goroutine 来处理通道中的任务。
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {if n <= 0 {panic("goroutine: GOMAXPROCS must great than 0")}g.workerOnce.Do(func() { // 确保该逻辑只执行一次g.ch = make(chan func(context.Context) error, n) // 创建缓冲通道,大小为 nfor i := 0; i < n; i++ { // 启动 n 个 goroutine 来处理通道中的任务go func() {for f := range g.ch {g.do(f) // 调用 g.do 方法执行任务}}()}})
}
使用 sync.Once 确保逻辑只执行一次。创建一个缓冲大小为 n 的通道 g.ch,用于存储任务。
启动 n 个 goroutine,循环从通道 g.ch 中获取任务并执行 g.do(f) 方法。每个 goroutine 都会持续从通道中获取任务并执行,直到通道被关闭。
在 for f := range g.ch 这种结构中,如果通道 g.ch 中没有任务,读取操作将会阻塞,直到有新的任务被写入通道。 也就是说开了n个goroutine在g.ch中等待任务发放和执行任务,所以最大并发的goroutine数量为n。某个goroutine从通道 g.ch 中取出的任务 f 不会在另一个 goroutine 的循环中再次出现,每个任务只会被一个 goroutine 处理一次。
4. 定义Go方法
Go方法用于启动一个新的goroutine,并将其添加到组中进行管理。如果Group已经初始化了工作通道,也就是如果有通道 g.ch,则尝试将任务发送到通道,如果通道已满(无法立即发送),则将函数 f 添加到 g.chs 列表中,等待稍后执行。如果没有通道 g.ch,则立即启动一个新的 goroutine 来执行任务。
func (g *Group) Go(f func(ctx context.Context) error) {g.wg.Add(1)goroutineCounterVec.WithLabelValues(g.name).Inc()if g.ch != nil {select {case g.ch <- f:default:g.chs = append(g.chs, f)}return}go g.do(f)
}
使用通道 g.ch 来限制同时运行的 goroutine 数量。当通道已满时,新的任务会被暂存到 g.chs 列表中。如果没有设置并发限制(即 g.ch 为 nil),则每次调用 Go 方法都会立即启动一个新的 goroutine 来执行任务。也就是提供了两种模式可供选择!
4. 定义Wait方法
Wait 方法用于等待所有通过 Go 方法启动的 goroutine 完成执行,并返回第一个非空错误(如果有)。
func (g *Group) Wait() error {if g.ch != nil {for _, f := range g.chs {g.ch <- f}}g.wg.Wait()if g.ch != nil {close(g.ch) // let all receiver exit}if g.cancel != nil {g.cancel()}return g.err
}
Wait 方法的设计确保了所有通过 Go 方法启动的 goroutine 都能够正确完成执行,并清理所有相关的资源。如果有任何 goroutine 返回错误,该方法会返回第一个非空错误。这个方法提供了一种优雅的方式来管理并发任务的生命周期和错误处理。
5. 定义具体执行任务的方法do方法
do方法负责在 goroutine 中执行任务,并处理可能发生的 panic。do方法执行传入的任务f,。如果任务中发生panic,do方法会根据配置的重试次数进行重试,并调用panicCb回调函数。
func (g *Group) do(f func(ctx context.Context) error) {//如果定义了 beforeCb 回调函数,调用它。这可以在每次任务开始前执行一些操作,如初始化工作或记录日志。if g.beforeCb != nil {g.beforeCb()}//初始化上下文ctx := g.ctxif ctx == nil {ctx = context.Background()}//设定重试次数为 g.panicTimes - 1。在 do 方法内部可能会递减该值来控制 panic 的重试逻辑。panicTimes := g.panicTimes - 1var (err error//run 是一个匿名函数,用于执行传入的任务 f(ctx),并在任务完成后进行错误处理和资源清理。run func()start = time.Now())run = func() {//通过 recover 捕获 panic 信息,并将堆栈信息存储在 buf 中,记录错误信息,并根据 panicCb 回调函数的返回值决定是否重试。defer func() {if r := recover(); r != nil {goroutineCrashedVec.WithLabelValues(g.name).Inc()isPanicRetry := truebuf := make([]byte, 4096) //nolint:gomndbuf = buf[:runtime.Stack(buf, false)]if e, ok := r.(error); ok {buf = append([]byte(fmt.Sprintf("%s\n", e.Error())), buf...)}if g.panicCb != nil {isPanicRetry = g.panicCb(buf)}//如果 panicCb 回调函数定义了,调用它,并判断是否继续重试。if isPanicRetry && panicTimes > 0 {panicTimes--if g.panicTimeout > 0 {time.Sleep(g.panicTimeout)}goroutineRecoverVec.WithLabelValues(g.name).Inc()//重试执行函数run()return} else {//如果重试次数用完了,则更新监控指标,记录 panic 发生的次数和恢复的次数。goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}err = fmt.Errorf("goroutine: panic recovered: %s", r)} else {//没有发生panic,则只用记录指标goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}//如果有err,则记录在g实例的字段中if err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}g.wg.Done()}()err = f(ctx)}run()
}
HOW
下面是一个使用Group管理goroutine的示例代码:
func demoFunc(){fmt.Println("finish")
}
func main() {group := NewGroup(Option{Name: "example-group",PanicCb: nil, // 使用默认的panic处理回调PanicTimes: 3, // 最大重试次数PanicTimeout: time.Second * 2, // 重试间隔})// 在这个group中启动5个goroutine执行任务,即增加五个func到group.chfor i := 0; i < 5; i++ {group.Go(func(ctx context.Context) error {// 在这里放入你要执行的函数(任务)demoFunc()return nil})}// 等待所有任务完成if err := group.Wait(); err != nil {fmt.Printf("group execution completed with error: %v\n", err)} else {fmt.Println("group execution completed successfully")}
}
相关文章:
详解Go语言中的Goroutine组(Group)在项目中的使用
背景(Why) Go语言通过其内置的goroutine和通道(channel)机制,提供了强大的并发支持。goroutine的开销非常低,一个goroutine仅占用几KB的内存,可以轻松创建成千上万个goroutine来处理并发任务。然而,随着并…...
Linux桌面环境手动编译安装librime、librime-lua以及ibus-rime,提升中文输入法体验
Linux上的输入法有很多,大体都使用了Fcitx或者iBus作为输入法的引擎。相当于有了一个很不错的“地基”,你可以在这个“地基”上盖上自己的“小别墅”。而rime输入法,就是一个“毛坯别墅”,你可以在rime的基础上,再装修…...
一文入门【NestJs】Providers
Nest学习系列 ✈️一文入门【NestJS】 ✈️一文入门【NestJs】Controllers 控制器 🚩 前言 在NestJS的世界里,理解“Providers”是构建健壮、可维护的后端服务的关键。NestJS,作为Node.js的一个现代框架,采用了Angular的一些核…...
云原生(Cloud native)
云原生(Cloud native) 一 定义 目前比较权威的定义主要来自Pivotal公司和云原生计算基金会(Cloud Native Computing Foundation,简称CNCF)。 1.1 Pivotal 4个要点: DevOps、持续交付、微服务、容器化。六…...
JVM OutOfMemoryError异常模拟
1.Java堆溢出 Java堆用于储存对象实例,我们只要不断地创建对象,并且保证 GC Roots 到对象之间有可达路径来避免垃圾回收机制清除这些对象,那么随着对象数量的增加,总容量触及最大堆的容量限制后就会 产生内存溢出异常。 限制Java …...
架构师机器学习操作 (MLOps) 指南
MLOps 是机器学习操作的缩写,是一组实践和工具,旨在满足工程师构建模型并将其投入生产的特定需求。一些组织从一些自主开发的工具开始,这些工具在每次实验后对数据集进行版本控制,并在每个训练周期后对检查点模型进行版本控制。另…...
【学习笔记】虚幻SkeletalMesh学习(一)基础介绍
文章目录 零、前言一、资源介绍1.1 骨架资源1.2 骨架网格体资源 二、UE4中的定义2.1 骨骼数据2.2 模型网格数据 三、渲染3.1 RenderData的初始化3.2 渲染对象的创建3.3 渲染对象的更新3.3.1 游戏线程的更新(*FSkeletalMeshObjectGPUSkin::Update*)3.3.2 …...
Apache防盗链、网页压缩、网页缓存
目录 网页压缩 类型 示例 动态添加模块操作步骤 重装Apache操作步骤 网页缓存 示例 操作步骤 隐藏版本信息 操作步骤 Apache防盗链 定义 原理 配置防盗链实验环境 实验环境 本地图片盗链示例 操作步骤 防盗链示例 操作步骤 网页压缩 网站的访问速度是由多个…...
LocalAI - 笔记
1.localAI https://localai.io/ 2 使用笔记本电脑搭建本地LLMs大模型环境 使用笔记本电脑搭建本地LLMs大模型环境 - 大模型知识库|大模型训练|开箱即用的企业大模型应用平台|智能体开发|53AI 3LocalAI视频 【LocalAI】(3):超级简单&…...
Windows图形界面(GUI)-SDK-C/C++ - 编辑框(edit)
公开视频 -> 链接点击跳转公开课程博客首页 -> 链接点击跳转博客主页 目录 编辑框(edit) 控件样式 创建控件 初始控件 消息处理 示例代码 编辑框(edit) 控件样式 编辑框(Edit Control)是Windows中最常用的控件之一,用于接收用户…...
区块链学习05-web3中solidity和move语言
Solidity 和 Move 语言的比较:Web3 开发中的两种选择 Solidity 和 Move 都是用于开发区块链平台智能合约的编程语言。它们具有一些相似之处,但也存在一些关键差异。 相似之处: Solidity 和 Move 都是图灵完备语言,这意味着它们可以表达计算…...
web滚动页面到指定位置
方法:scrollTo(x-coord,y-coord) 方法是Web API中Element接口的一部分,但它主要用于Window对象或可滚动的元素(如具有overflow属性为auto或scroll的<div>)。此方法用于将窗口滚动到文档中的特定位置,或者将某个元…...
操作系统真象还原:实现文件写入
14.7 实现文件写入 这是一个网站有所有小节的代码实现,同时也包含了Bochs等文件 本节要实现的 sys_write 是系统调用 write 的内核实现,咱们之前的 write 是个简易版,它是为了临时完成输出打印的功能,不支持文件描述符。如今要让…...
FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题
其实代码没有问题,但是我们忽略了一个问题,就是在正常的开发中,肯定是遇到过这样的情况,我们频繁的有客户端链接,断开连接,需要统一的管理这些链接,那么应该如何管理呢。其实可以声明一个类去管…...
STM32智能楼宇照明系统教程
目录 引言环境准备智能楼宇照明系统基础代码实现:实现智能楼宇照明系统 4.1 数据采集模块 4.2 数据处理与控制模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景:楼宇照明管理与优化问题解决方案与优化收尾与总结 1. 引言 智能楼宇照明系…...
【C语言】原码、反码、补码详解 -《码上有道 ! 》
目录 原码、反码、补码详解及其在C语言中的应用一、原码(Sign-Magnitude)1.1 定义与表示1.2 历史来源与作用1.3 示例1.4 C语言示例1.5 代码运行结果 二、反码(Ones Complement)2.1 定义与表示2.2 历史来源与作用2.3 示例2.4 C语言…...
C++找到错误的具体信息
fprintf(stderr, "Errno: %d, Error message: %s\n", errno, strerror(errno));为什么不用printf来打印输出? 使用 fprintf(stderr, …),将错误消息输出到标准错误流 stderr。这种做法是为了将错误信息输出到一个专门用于记录错误的流中&…...
Windows 安装Zookeeper
安装 下载地址:Apache ZooKeeper 我下载的版本:zookeeper-3.4.12 下载后,解压 配置 1、 在D:\zookeeper-3.4.12文件夹中创建一个“data”文件夹和“log”文件夹 2、 复制zoo_sample.cfg,改名:zoo.cfg 修改zoo.c…...
从人工巡检到智能预警:视频AI智能监控技术在水库/河湖/水利防汛抗洪中的应用
一、背景需求分析 近日,我国多省市遭遇连日暴雨,导致水库、湖泊、河道等水域水位暴涨,城市内涝频发。随着夏季汛期的到来,降雨天气频繁,水利安全管理面临严峻挑战。为保障水库安全、预防和减少洪涝灾害,采…...
【轻松拿捏 】Java-static关键字(面试)
Java-static关键字 1. 定义和基本概念 回答要点: 示例回答: 2. static 变量 回答要点: 示例回答: 代码示例: 3. static方法 回答要点: 示例回答: 代码示例: 4. static 代…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例
文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...
