详解Go语言中的Goroutine组(Group)在项目中的使用
背景(Why)
Go语言通过其内置的goroutine和通道(channel)机制,提供了强大的并发支持。goroutine的开销非常低,一个goroutine仅占用几KB的内存,可以轻松创建成千上万个goroutine来处理并发任务。然而,随着并发任务数量的增加,管理goroutine的生命周期、处理错误以及保证资源正确回收变得越来越复杂。例如,我们需要处理以下场景:
- 错误处理困难:如果某个goroutine发生错误或panic,需要有机制捕获这些错误并作出相应处理。
- 资源管理复杂:确保所有goroutine在完成任务后正确回收资源,防止资源泄漏。
- 任务调度不灵活:在多个goroutine之间调度任务,确保高效执行和公平分配。在goroutine执行前后进行必要的操作,如日志记录或环境准备。
- 同步复杂性:确保所有goroutine都在某个时间点前完成,或者在发生重大错误时取消所有未完成的goroutine。
为了解决这些问题,引入了一个Group
结构体,提供了一种更高级的方式来管理一组goroutine。
What
定义一个Group
结构体来实现goroutine组管理
type Group struct {chs []func(ctx context.Context) error // 保存所有要在组中执行的任务name string // 组名err error // 保存组中发生的第一个错误ctx context.Context // 组的上下文,用于控制任务的执行panicCb func([]byte) bool // 在发生 panic 时调用的回调函数beforeCb func() // 在任务执行之前调用的回调函数panicTimeout time.Duration // 调用 panicCb 之间的时间间隔ch chan func(ctx context.Context) error // 任务通道cancel func() // 取消任务的函数wg sync.WaitGroup // 等待组内所有任务完成errOnce sync.Once // 确保 err 只被设置一次workerOnce sync.Once // 确保 worker 只被启动一次panicTimes int8 // 最大允许 panic 的次数
}
chs []func(ctx context.Context) error
:
- 类型:切片,包含多个函数,这些函数接受
context.Context
作为参数并返回错误。- 用途:保存所有要在组中执行的任务。
name string
:
- 类型:字符串。
- 用途:保存组的名称。
err error
:
- 类型:错误。
- 用途:保存组中第一个发生的错误。
ctx context.Context
:
- 类型:上下文。
- 用途:控制任务的执行,可以用来取消任务或者设置任务的超时时间。
panicCb func([]byte) bool
:
- 类型:函数,接受一个字节切片参数(panic 的堆栈信息)并返回布尔值。
- 用途:当组中的任务发生 panic 时调用的回调函数。
beforeCb func()
:
- 类型:函数,无参数无返回值。
- 用途:在每个任务执行之前调用的回调函数。
panicTimeout time.Duration
:
- 类型:持续时间。
- 用途:两次调用
panicCb
之间的时间间隔。如果某个任务频繁地发生panic
,而每次panic
都调用panicCb
,这可能会导致系统性能下降或产生大量日志。通过设置panicTimeout
,可以限制panicCb
的调用频率,确保在一个指定的时间间隔内不会多次调用panicCb
。
ch chan func(ctx context.Context) error
:
- 类型:通道,包含函数,这些函数接受
context.Context
作为参数并返回错误。- 用途:用于在组内传递任务。
cancel func()
:
- 类型:函数,无参数无返回值。
- 用途:用于取消组内的所有任务。
wg sync.WaitGroup
:
- 类型:等待组。
- 用途:用于等待组内所有任务完成。
errOnce sync.Once
:
- 类型:同步 Once。
- 用途:确保
err
只被设置一次。
workerOnce sync.Once
:
- 类型:同步 Once。
- 用途:确保 worker 只被启动一次。
panicTimes int8
:
- 类型:整数(8位)。
- 用途:最大允许的 panic 次数。
创建NewGroup
函数
NewGroup
函数用于创建一个新的goroutine组实例,初始化相关参数,并设置panic处理回调函数。
func NewGroup(option Option) *Group {log = logger.SLogger("goroutine")name := "default"if len(option.Name) > 0 {name = option.Name}g := &Group{name: name,panicCb: option.PanicCb,panicTimes: option.PanicTimes,panicTimeout: option.PanicTimeout,}//如果 option 中未提供 panicCb,则使用默认的 panicCb 回调函数。这个函数会记录 panic 的信息,并增加 goroutineCrashedVec 指标。if g.panicCb == nil {g.panicCb = func(crashStack []byte) bool {log.Errorf("recover panic: %s", string(crashStack))goroutineCrashedVec.WithLabelValues(name).Inc()return true}}goroutineGroups.Inc()return g
}
3.定义GOMAXPROCS
方法
GOMAXPROCS
函数用于设置并发执行的最大 goroutine 数量。具体来说,它通过创建一个缓冲通道来限制并发执行的 goroutine 数量,并启动相应数量的 goroutine 来处理通道中的任务。
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {if n <= 0 {panic("goroutine: GOMAXPROCS must great than 0")}g.workerOnce.Do(func() { // 确保该逻辑只执行一次g.ch = make(chan func(context.Context) error, n) // 创建缓冲通道,大小为 nfor i := 0; i < n; i++ { // 启动 n 个 goroutine 来处理通道中的任务go func() {for f := range g.ch {g.do(f) // 调用 g.do 方法执行任务}}()}})
}
使用 sync.Once
确保逻辑只执行一次。创建一个缓冲大小为 n
的通道 g.ch
,用于存储任务。
启动 n
个 goroutine,循环从通道 g.ch
中获取任务并执行 g.do(f)
方法。每个 goroutine 都会持续从通道中获取任务并执行,直到通道被关闭。
在 for f := range g.ch
这种结构中,如果通道 g.ch
中没有任务,读取操作将会阻塞,直到有新的任务被写入通道。 也就是说开了n个goroutine在g.ch中等待任务发放和执行任务,所以最大并发的goroutine数量为n。某个goroutine从通道 g.ch
中取出的任务 f
不会在另一个 goroutine 的循环中再次出现,每个任务只会被一个 goroutine 处理一次。
4. 定义Go
方法
Go
方法用于启动一个新的goroutine,并将其添加到组中进行管理。如果Group
已经初始化了工作通道,也就是如果有通道 g.ch
,则尝试将任务发送到通道,如果通道已满(无法立即发送),则将函数 f
添加到 g.chs
列表中,等待稍后执行。如果没有通道 g.ch
,则立即启动一个新的 goroutine 来执行任务。
func (g *Group) Go(f func(ctx context.Context) error) {g.wg.Add(1)goroutineCounterVec.WithLabelValues(g.name).Inc()if g.ch != nil {select {case g.ch <- f:default:g.chs = append(g.chs, f)}return}go g.do(f)
}
使用通道 g.ch
来限制同时运行的 goroutine 数量。当通道已满时,新的任务会被暂存到 g.chs
列表中。如果没有设置并发限制(即 g.ch
为 nil
),则每次调用 Go
方法都会立即启动一个新的 goroutine 来执行任务。也就是提供了两种模式可供选择!
4. 定义Wait
方法
Wait
方法用于等待所有通过 Go
方法启动的 goroutine 完成执行,并返回第一个非空错误(如果有)。
func (g *Group) Wait() error {if g.ch != nil {for _, f := range g.chs {g.ch <- f}}g.wg.Wait()if g.ch != nil {close(g.ch) // let all receiver exit}if g.cancel != nil {g.cancel()}return g.err
}
Wait
方法的设计确保了所有通过 Go
方法启动的 goroutine 都能够正确完成执行,并清理所有相关的资源。如果有任何 goroutine 返回错误,该方法会返回第一个非空错误。这个方法提供了一种优雅的方式来管理并发任务的生命周期和错误处理。
5. 定义具体执行任务的方法do
方法
do方法负责在 goroutine 中执行任务,并处理可能发生的 panic。do
方法执行传入的任务f
,。如果任务中发生panic,do
方法会根据配置的重试次数进行重试,并调用panicCb
回调函数。
func (g *Group) do(f func(ctx context.Context) error) {//如果定义了 beforeCb 回调函数,调用它。这可以在每次任务开始前执行一些操作,如初始化工作或记录日志。if g.beforeCb != nil {g.beforeCb()}//初始化上下文ctx := g.ctxif ctx == nil {ctx = context.Background()}//设定重试次数为 g.panicTimes - 1。在 do 方法内部可能会递减该值来控制 panic 的重试逻辑。panicTimes := g.panicTimes - 1var (err error//run 是一个匿名函数,用于执行传入的任务 f(ctx),并在任务完成后进行错误处理和资源清理。run func()start = time.Now())run = func() {//通过 recover 捕获 panic 信息,并将堆栈信息存储在 buf 中,记录错误信息,并根据 panicCb 回调函数的返回值决定是否重试。defer func() {if r := recover(); r != nil {goroutineCrashedVec.WithLabelValues(g.name).Inc()isPanicRetry := truebuf := make([]byte, 4096) //nolint:gomndbuf = buf[:runtime.Stack(buf, false)]if e, ok := r.(error); ok {buf = append([]byte(fmt.Sprintf("%s\n", e.Error())), buf...)}if g.panicCb != nil {isPanicRetry = g.panicCb(buf)}//如果 panicCb 回调函数定义了,调用它,并判断是否继续重试。if isPanicRetry && panicTimes > 0 {panicTimes--if g.panicTimeout > 0 {time.Sleep(g.panicTimeout)}goroutineRecoverVec.WithLabelValues(g.name).Inc()//重试执行函数run()return} else {//如果重试次数用完了,则更新监控指标,记录 panic 发生的次数和恢复的次数。goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}err = fmt.Errorf("goroutine: panic recovered: %s", r)} else {//没有发生panic,则只用记录指标goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}//如果有err,则记录在g实例的字段中if err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}g.wg.Done()}()err = f(ctx)}run()
}
HOW
下面是一个使用Group
管理goroutine的示例代码:
func demoFunc(){fmt.Println("finish")
}
func main() {group := NewGroup(Option{Name: "example-group",PanicCb: nil, // 使用默认的panic处理回调PanicTimes: 3, // 最大重试次数PanicTimeout: time.Second * 2, // 重试间隔})// 在这个group中启动5个goroutine执行任务,即增加五个func到group.chfor i := 0; i < 5; i++ {group.Go(func(ctx context.Context) error {// 在这里放入你要执行的函数(任务)demoFunc()return nil})}// 等待所有任务完成if err := group.Wait(); err != nil {fmt.Printf("group execution completed with error: %v\n", err)} else {fmt.Println("group execution completed successfully")}
}
相关文章:
详解Go语言中的Goroutine组(Group)在项目中的使用
背景(Why) Go语言通过其内置的goroutine和通道(channel)机制,提供了强大的并发支持。goroutine的开销非常低,一个goroutine仅占用几KB的内存,可以轻松创建成千上万个goroutine来处理并发任务。然而,随着并…...

Linux桌面环境手动编译安装librime、librime-lua以及ibus-rime,提升中文输入法体验
Linux上的输入法有很多,大体都使用了Fcitx或者iBus作为输入法的引擎。相当于有了一个很不错的“地基”,你可以在这个“地基”上盖上自己的“小别墅”。而rime输入法,就是一个“毛坯别墅”,你可以在rime的基础上,再装修…...

一文入门【NestJs】Providers
Nest学习系列 ✈️一文入门【NestJS】 ✈️一文入门【NestJs】Controllers 控制器 🚩 前言 在NestJS的世界里,理解“Providers”是构建健壮、可维护的后端服务的关键。NestJS,作为Node.js的一个现代框架,采用了Angular的一些核…...
云原生(Cloud native)
云原生(Cloud native) 一 定义 目前比较权威的定义主要来自Pivotal公司和云原生计算基金会(Cloud Native Computing Foundation,简称CNCF)。 1.1 Pivotal 4个要点: DevOps、持续交付、微服务、容器化。六…...
JVM OutOfMemoryError异常模拟
1.Java堆溢出 Java堆用于储存对象实例,我们只要不断地创建对象,并且保证 GC Roots 到对象之间有可达路径来避免垃圾回收机制清除这些对象,那么随着对象数量的增加,总容量触及最大堆的容量限制后就会 产生内存溢出异常。 限制Java …...

架构师机器学习操作 (MLOps) 指南
MLOps 是机器学习操作的缩写,是一组实践和工具,旨在满足工程师构建模型并将其投入生产的特定需求。一些组织从一些自主开发的工具开始,这些工具在每次实验后对数据集进行版本控制,并在每个训练周期后对检查点模型进行版本控制。另…...

【学习笔记】虚幻SkeletalMesh学习(一)基础介绍
文章目录 零、前言一、资源介绍1.1 骨架资源1.2 骨架网格体资源 二、UE4中的定义2.1 骨骼数据2.2 模型网格数据 三、渲染3.1 RenderData的初始化3.2 渲染对象的创建3.3 渲染对象的更新3.3.1 游戏线程的更新(*FSkeletalMeshObjectGPUSkin::Update*)3.3.2 …...

Apache防盗链、网页压缩、网页缓存
目录 网页压缩 类型 示例 动态添加模块操作步骤 重装Apache操作步骤 网页缓存 示例 操作步骤 隐藏版本信息 操作步骤 Apache防盗链 定义 原理 配置防盗链实验环境 实验环境 本地图片盗链示例 操作步骤 防盗链示例 操作步骤 网页压缩 网站的访问速度是由多个…...
LocalAI - 笔记
1.localAI https://localai.io/ 2 使用笔记本电脑搭建本地LLMs大模型环境 使用笔记本电脑搭建本地LLMs大模型环境 - 大模型知识库|大模型训练|开箱即用的企业大模型应用平台|智能体开发|53AI 3LocalAI视频 【LocalAI】(3):超级简单&…...
Windows图形界面(GUI)-SDK-C/C++ - 编辑框(edit)
公开视频 -> 链接点击跳转公开课程博客首页 -> 链接点击跳转博客主页 目录 编辑框(edit) 控件样式 创建控件 初始控件 消息处理 示例代码 编辑框(edit) 控件样式 编辑框(Edit Control)是Windows中最常用的控件之一,用于接收用户…...

区块链学习05-web3中solidity和move语言
Solidity 和 Move 语言的比较:Web3 开发中的两种选择 Solidity 和 Move 都是用于开发区块链平台智能合约的编程语言。它们具有一些相似之处,但也存在一些关键差异。 相似之处: Solidity 和 Move 都是图灵完备语言,这意味着它们可以表达计算…...
web滚动页面到指定位置
方法:scrollTo(x-coord,y-coord) 方法是Web API中Element接口的一部分,但它主要用于Window对象或可滚动的元素(如具有overflow属性为auto或scroll的<div>)。此方法用于将窗口滚动到文档中的特定位置,或者将某个元…...
操作系统真象还原:实现文件写入
14.7 实现文件写入 这是一个网站有所有小节的代码实现,同时也包含了Bochs等文件 本节要实现的 sys_write 是系统调用 write 的内核实现,咱们之前的 write 是个简易版,它是为了临时完成输出打印的功能,不支持文件描述符。如今要让…...

FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题
其实代码没有问题,但是我们忽略了一个问题,就是在正常的开发中,肯定是遇到过这样的情况,我们频繁的有客户端链接,断开连接,需要统一的管理这些链接,那么应该如何管理呢。其实可以声明一个类去管…...

STM32智能楼宇照明系统教程
目录 引言环境准备智能楼宇照明系统基础代码实现:实现智能楼宇照明系统 4.1 数据采集模块 4.2 数据处理与控制模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景:楼宇照明管理与优化问题解决方案与优化收尾与总结 1. 引言 智能楼宇照明系…...
【C语言】原码、反码、补码详解 -《码上有道 ! 》
目录 原码、反码、补码详解及其在C语言中的应用一、原码(Sign-Magnitude)1.1 定义与表示1.2 历史来源与作用1.3 示例1.4 C语言示例1.5 代码运行结果 二、反码(Ones Complement)2.1 定义与表示2.2 历史来源与作用2.3 示例2.4 C语言…...
C++找到错误的具体信息
fprintf(stderr, "Errno: %d, Error message: %s\n", errno, strerror(errno));为什么不用printf来打印输出? 使用 fprintf(stderr, …),将错误消息输出到标准错误流 stderr。这种做法是为了将错误信息输出到一个专门用于记录错误的流中&…...
Windows 安装Zookeeper
安装 下载地址:Apache ZooKeeper 我下载的版本:zookeeper-3.4.12 下载后,解压 配置 1、 在D:\zookeeper-3.4.12文件夹中创建一个“data”文件夹和“log”文件夹 2、 复制zoo_sample.cfg,改名:zoo.cfg 修改zoo.c…...

从人工巡检到智能预警:视频AI智能监控技术在水库/河湖/水利防汛抗洪中的应用
一、背景需求分析 近日,我国多省市遭遇连日暴雨,导致水库、湖泊、河道等水域水位暴涨,城市内涝频发。随着夏季汛期的到来,降雨天气频繁,水利安全管理面临严峻挑战。为保障水库安全、预防和减少洪涝灾害,采…...

【轻松拿捏 】Java-static关键字(面试)
Java-static关键字 1. 定义和基本概念 回答要点: 示例回答: 2. static 变量 回答要点: 示例回答: 代码示例: 3. static方法 回答要点: 示例回答: 代码示例: 4. static 代…...

家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...

C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...

给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...

数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...