当前位置: 首页 > news >正文

WaitGroup原理分析

背景

在实际业务开发中,我们会遇到以下场景:请求数据库,批量获取1000条数据记录后,处理数据
为了减少因一次批量获取的数据太多,导致的数据库延时增加,我们可以把一次请求拆分成多次请求,并发去处理,当所有的并发请求完成后,再继续处理这些返回的数据
golang中的WaitGroup,就可以帮助我们实现上述的场景

快速入门

背景:开启10个goroutine并发执行,等待所有goroutine执行完成后,当前goroutine打印执行完成

func TestWaitGroup(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {index := igo func() {wg.Add(1)defer wg.Done()fmt.Println(fmt.Sprintf("%+v 正在执行", index))}()}wg.Wait()fmt.Println("TestWaitGroup method done")
}

源码分析

golang版本:1.18.2

源码路径:src/sync/waitgroup.go

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
// WaitGroup 等待 goroutine 集合完成
// 主 goroutine 调用 Add 设置等待的 goroutine 数量
// 然后每个 goroutine 运行并在完成时调用 Done
// 同时,Wait 可以用来阻塞,直到所有 goroutine 都完成
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.// 64位值:高32位是计数器,低32位是waiter计数// 64位原子操作需要64位对齐,但32位编译器仅保证64位字段是32位对齐的// 因此,在 32 位架构上,我们需要在 state() 中检查 state1 是否对齐,并在需要时动态“交换”字段顺序state1 uint64state2 uint32
}

noCopy:WaitGroup在首次使用后,不能被复制
state1,state2:一共占用12字节,保存了三类信息:4字节保存goroutine计数,4字节保存waiter计数,4字节保存信号量
WaitGroup对外提供了以下三个方法:

// 设置等待的goroutine数量
func (wg *WaitGroup) Add(delta int)
// goroutine执行完成
func (wg *WaitGroup) Done()
// 阻塞等待所有的goroutine都执行完成
func (wg *WaitGroup) Wait()

Add

// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {// state1 is 64-bit aligned: nothing to do.return &wg.state1, &wg.state2} else {// state1 is 32-bit aligned but not 64-bit aligned: this means that// (&state1)+4 is 64-bit aligned.state := (*[3]uint32)(unsafe.Pointer(&wg.state1))return (*uint64)(unsafe.Pointer(&state[1])), &state[0]}
}// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 将 delta(可能为负)添加到 WaitGroup 计数器。
// 如果计数器变为零,则所有在 Wait 上阻塞的 goroutine 都会被释放。
// 如果计数器变为负数,则添加panic。
// 请注意,计数器为零时发生的具有正增量的调用必须在等待之前发生。 
// 具有负增量的调用或在计数器大于零时开始的具有正增量的调用可能随时发生。
// 通常,这意味着对 Add 的调用应该在创建 goroutine 或其他要等待的事件的语句之前执行。
// 如果重用一个 WaitGroup 来等待几个独立的事件集,新的 Add 调用必须在所有先前的 Wait 调用返回后发生。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// 记录goroutine计数state := atomic.AddUint64(statep, uint64(delta)<<32)// 获取goroutine计数v := int32(state >> 32)// 获取waiter计数w := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(semap))}// goroutine计数小于0if v < 0 {panic("sync: negative WaitGroup counter")}// w != 0说明已经执行了Wait且还有阻塞等待的goroutine,此时不允许在执行Addif w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 存在没有执行完成的goroutine,或者当前没有waiter,直接返回if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 此时goroutine计数为0,且waiter计数大于0,不然上一步就返回了// 现在以下状态不能同时发生:// 1. 并发调用Add和Wait// 2. 当goroutine计数为0时,Wait不会继续增加waiter计数// 仍然做一个廉价的健全性检查来检测 WaitGroup 的滥用,防止以上情况发生if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 重置waiter计数*statep = 0// 唤醒所有的waiterfor ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}

delta代表本次需要记录的goroutine计数,可能为负数
64位原子操作需要64位对齐,但32位编译器仅保证64位字段是32位对齐的
当state1是64位对齐时,state1高32位是goroutine计数,低32位是waiter计数
当state1不是64位对齐时,动态“交换”字段顺序
记录goroutine计数的变化delta
如果goroutine计数小于0,则直接panic
如果已经执行了Wait且还有阻塞等待的goroutine,此时不允许在执行Add
如果存在没有执行完成的goroutine,或者当前没有waiter,直接返回
当goroutine计数为0,且waiter计数大于0时,现在以下状态不能同时发生:

并发调用Add和Wait
当goroutine计数为0时,Wait不会继续增加waiter计数

简单校验通过后,重置waiter计数为0,唤醒所有阻塞等待的waiter

Done

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

调用Add,delta = -1,代表goroutine计数-1

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)// 获取goroutine计数v := int32(state >> 32)// 获取waiter计数w := uint32(state)// goroutine计数为0,不需要等待,直接返回if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// waiter计数+1if atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞,等待goroutine计数为0后唤醒继续执行runtime_Semacquire(semap)// Wait还没有执行完成,就开始复用WaitGroupif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}

调用state(),保证字段内存对齐
如果goroutine计数为0,不需要等待,直接返回
尝试对waiter计数+1,若失败,则继续下一轮重试
对waiter计数+1成功,则阻塞当前goroutine,等待goroutine计数为0后唤醒继续执行
唤醒继续执行后,简单判断是否存在Wait还没有执行完成,就开始复用WaitGroup的情况,如果有,则panic;如果没有,则直接返回

相关文章:

WaitGroup原理分析

背景 在实际业务开发中&#xff0c;我们会遇到以下场景&#xff1a;请求数据库&#xff0c;批量获取1000条数据记录后&#xff0c;处理数据 为了减少因一次批量获取的数据太多&#xff0c;导致的数据库延时增加&#xff0c;我们可以把一次请求拆分成多次请求&#xff0c;并发去…...

java直播源码:如何使用Java构建一个高效的直播系统

Java直播源码是一种用于实现实时视频流传输的技术&#xff0c;它可以将视频流从一个地方传送到另一个地方。它使用Java语言编写&#xff0c;可以支持多种视频格式&#xff0c;如MPEG、H.264等。 Java直播源码是一种用于实现实时视频流传输的技术&#xff0c;它可以将视频流从一…...

Websocket获取B站直播间弹幕教程——第二篇、解包/拆包

教程一、Websocket获取B站直播间弹幕教程 — 哔哩哔哩直播开放平台 1、封包 我们连接上B站Websocket成功后&#xff0c;要做两件事情&#xff1a; 第一、发送鉴权包。第二、发送心跳包&#xff0c;每30秒一次&#xff0c;维持websocket连接。 这两个包不是直接发送过去&…...

膝关节检测之1设计目标手势与物体交互的动画

原来只用unity自带的IK&#xff0c;发现背部不能动&#xff0c;且手和手指的移动和旋转试了好像没法通过animation实现&#xff08;加入关键帧并修改最终状态的数值后播放没有变化&#xff0c;确定最终关键帧的数值已经改了的&#xff09;。看资料&#xff0c;发现final IK&…...

canvas力导布局

老规矩&#xff0c;先上效果图 <html><head><style>* {margin: 0;padding: 0;}canvas {display: block;width: 100%;height: 100%;background: #000;}</style> </head><body><canvas id"network"></canvas> </…...

【网络安全】「漏洞原理」(二)SQL 注入漏洞之理论讲解

前言 严正声明&#xff1a;本博文所讨论的技术仅用于研究学习&#xff0c;旨在增强读者的信息安全意识&#xff0c;提高信息安全防护技能&#xff0c;严禁用于非法活动。任何个人、团体、组织不得用于非法目的&#xff0c;违法犯罪必将受到法律的严厉制裁。 【点击此处即可获…...

JavaScript中类的学习

一、JavaScript中的类 1.什么是类 类描述了一种代码的组织结构形式&#xff0c;不同的语言中对其实现形式各有差异。JavaScript中的类Class实际是一种描述对象之间引用关系的语法糖。 在Class语法糖出现之前&#xff0c;我们想重用一个功能模块&#xff0c;通常是用一个函数来…...

1600*A. Linova and Kingdom(DFS优先队列贪心)

Problem - 1336A - Codeforces Linova and Kingdom - 洛谷 解析&#xff1a; 开始认为分情况讨论 k 小于等于叶子结点和大于叶子结点的情况&#xff0c;然后选择深度最深的叶子结点和子孙数量最小的结点&#xff0c;但是发现如果把某一个非叶子结点选取&#xff0c;那么其子孙…...

gitlab git lfs的替代软件整理汇总及分析

文章目录 前言替代软件分析git-annexgit-fatgit-symgit-meida 总结 前言 git-lfs科普 Git LFS&#xff08;Large File Storage&#xff09;是一个Git扩展&#xff0c;用于管理大型文件。Git LFS通过将大型文件存储在Git仓库之外&#xff0c;从而加快了Git操作的速度。它使用指…...

IDEA 2023.2.2图文安装教程及下载

IDE 系列的第二个年度更新现已发布&#xff0c;涵盖 IntelliJ IDEA、WebStorm、PyCharm、DataGrip、GoLand、DataSpell 以及 All Products Pack 订阅中包含的其他工具。该版本还包括多项用户体验增强功能&#xff0c;例如 Search Everywhere&#xff08;随处搜索&#xff09;中…...

第六届“中国法研杯”司法人工智能挑战赛

解锁司法科技的未来 “中国法研杯”司法人工智能挑战赛&#xff08;Legal AI Challenge&#xff0c;简称LAIC&#xff09;&#xff0c;是面向法院侧人工智能应用领域唯一权威比赛&#xff0c;大赛愿景是在拥有全球最大规模司法数据的中国&#xff0c;实现法律界、学术界、产业界…...

Springcloud中间件-----分布式搜索引擎 Elasticsearch

该笔记是根据黑马程序员的课来自己写了一遍的,b站有对应教程和资料 第一部分 第二部分 第三部分 预计看完跟着练习5小时足够 1.初识elasticsearch 1.1.了解ES 1.1.1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff…...

基于深度学习的目标检测和语义分割:机器视觉中的最新进展

基于深度学习的目标检测和语义分割是机器视觉领域的两个重要任务&#xff0c;它们在图像处理、自动驾驶、医学影像分析和智能视频监控等应用中发挥着关键作用。以下是这两个领域的最新进展&#xff1a; 目标检测&#xff08;Object Detection&#xff09;&#xff1a; 一阶段检…...

微信小程序报错request:fail -2:net::ERR_FAILED(生成中间证书)

微信小程序报错request:fail -2:net::ERR_FAILED-生成中间证书 前言一、检查网站ssl证书二、生成证书方法1.获取中间证书手动合并1.进入网站&#xff1a;[https://www.myssl.cn/tools/downloadchain.html](https://www.myssl.cn/tools/downloadchain.html)2.点击下一步3.手动合…...

Ubuntu更改时区

sudo apt install tzdata 进行安装时区&#xff0c;有很多时区可供选择。 然后执行:tzselect rootd75c94dcd226:/# date 2023年 10月 11日 星期三 06:25:12 UTC rootd75c94dcd226:/# tzselect Please identify a location so that time zone rules can be set correctly. Ple…...

0144 文件管理

目录 4.文件管理 4.1文件系统基础 4.2目录 4.3文件系统 部分习题 4.文件管理 4.1文件系统基础 4.2目录 4.3文件系统 部分习题 1.UNIX操作系统忠&#xff0c;输入/输出设备视为&#xff08;&#xff09; A.普通文件 B.目录文件 C.索引文件 D.特殊文…...

python psutil库之——获取网络信息(网络接口信息、网络配置信息、以太网接口、ip信息、ip地址信息)

文章目录 使用Python psutil库获取网络信息安装psutil库获取网络连接信息查看所有网络连接过滤特定状态的连接 获取网络接口信息获取网络IO统计信息实例1实例2 总结 使用Python psutil库获取网络信息 Python的psutil库是一个跨平台库&#xff0c;能够方便地获取系统使用情况和…...

uniapp上echarts地图钻取

1: 预期效果 通过切换地图 , 实现地图的钻取效果 2: 实现原理以及核心方法/参数 一开始是想利用更换地图数据的形式进行地图钻取 , 这就意味着我们需要准备全国30多个省份的地图数据 , 由于一开始考虑需要适配小程序端 , 如此多的地图文件增加了程序的体积 , 如果使用接口调…...

scratch保护环境 2023年5月中国电子学会图形化编程 少儿编程 scratch编程等级考试一级真题和答案解析

目录 scratch保护环境 一、题目要求 1、准备工作 2、功能实现 二、案例分析...

RPC分布式网络通信框架项目

文章目录 对比单机聊天服务器、集群聊天服务器以及分布式聊天服务器RPC通信原理使用Protobuf做数据的序列化&#xff0c;相比较于json&#xff0c;有哪些优点&#xff1f;环境配置使用项目代码工程目录vscode远程开发Linux项目muduo网络库编程示例CMake构建项目集成编译环境Lin…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

ArcPy扩展模块的使用(3)

管理工程项目 arcpy.mp模块允许用户管理布局、地图、报表、文件夹连接、视图等工程项目。例如&#xff0c;可以更新、修复或替换图层数据源&#xff0c;修改图层的符号系统&#xff0c;甚至自动在线执行共享要托管在组织中的工程项。 以下代码展示了如何更新图层的数据源&…...

基于 HTTP 的单向流式通信协议SSE详解

SSE&#xff08;Server-Sent Events&#xff09;详解 &#x1f9e0; 什么是 SSE&#xff1f; SSE&#xff08;Server-Sent Events&#xff09; 是 HTML5 标准中定义的一种通信机制&#xff0c;它允许服务器主动将事件推送给客户端&#xff08;浏览器&#xff09;。与传统的 H…...