协程并发下数据汇总:除了互斥锁,还有其他方式吗?
1. 简介
本文介绍了在并发编程中数据汇总的问题,并探讨了在并发环境下使用互斥锁和通道两种方式来保证数据安全性的方法。
首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,讨论了互斥锁的一些缺点,引出了通道作为一种替代方案,并介绍了通道的基本使用和特性。接下来,通过实例演示了如何使用通道来实现并发下的数据汇总。
最后,引用了etcd中使用通道实现协程并发下数据汇总的例子,展示了通道在实际项目中的应用。
2. 问题引入
在请求处理过程中,经常需要通过RPC接口拉取数据。有时候,由于数据量较大,单个数据拉取操作可能会导致整个请求的处理时间较长。为了加快处理速度,我们通常考虑同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程需要汇总这些协程拉取到的数据,然后再返回结果。在这个过程中,往往涉及对共享资源的并发访问,为了保证线程安全性,通常会使用互斥锁。下面通过一个简单的代码来展示该过程:
package mainimport ("fmt""sync""time"
)type Data struct {ID intName string
}var (// 汇总结果dataList []Data// 互斥锁mutex sync.Mutex
)func fetchData(page int, wg *sync.WaitGroup) {// 模拟RPC接口拉取数据的耗时操作time.Sleep(time.Second)// 假设从RPC接口获取到了一批数据data := Data{ID: page,Name: fmt.Sprintf("Data %d", page),}// 使用互斥锁保护共享数据的并发访问mutex.Lock()defer mutext.Unlock()dataList = append(dataList, data)wg.Done()
}func main() {var wg sync.WaitGroup// 定义需要拉取的数据页数numPages := 10// 启动多个协程并发地拉取数据for i := 1; i <= numPages; i++ {wg.Add(1)go fetchData(i, &wg)}// 等待所有协程完成wg.Wait()// 打印拉取到的数据fmt.Println("Fetched data:")for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)}
}
在上述示例中,我们定义了一个共享的dataList切片用于保存拉取到的数据。每个goroutine通过调用fetchData函数来模拟拉取数据的过程,并使用互斥锁mutex保护dataList的并发访问。主协程使用sync.WaitGroup等待所有协程完成数据拉取任务,然后打印出拉取到的数据。通过并发地拉取数据,并使用互斥锁保证线程安全,我们可以显著提高数据拉取的速度,并且确保数据的正确性和一致性。
回看上述实现,其实是涉及到了多个协程操作同一份数据,有可能导致线程安全的问题,然后这里是通过互斥锁来保证线程安全的。确实,使用互斥锁是可以保证线程安全的,但是也是存在一些缺点的,比如竞争和阻塞,两个协程同时竞争互斥锁时,只有一个协程能够获得锁,而其他协程则会被阻塞,这个就可能导致性能瓶颈,当然在这个场景下问题不大。其次就是代码的复杂性提高了,使用互斥锁需要仔细设计和管理,确保锁的正确获取和释放。这增加了代码的复杂性和维护成本,如果在代码中处理锁的方式不正确,可能会死锁,导致程序无法继续执行。
那我们其实就有疑问,在协程并发下数据汇总的场景,是否存在其他方式,不需要通过使用互斥锁,也能够保证线程安全呢? 其实还真有,Go语言中的channel非常适用于这种情况。通过使用通道,我们可以实现线程安全的数据共享和同步,而无需显式地使用互斥锁。下面我们来了解一下channel。
3. channel的使用
3.1 channel的基本介绍
3.1.1 基本说明
channel在Go语言中是一种特殊的数据结构,用于协程之间的通信和同步。它类似于一个先进先出(FIFO)的队列,用于数据的传输和共享。在并发环境中,可以将数据发送到通道,也可以从通道中接收数据,而这两个操作都是线程安全的。
使用channel的优势在于它提供了内置的同步机制,无需显式地使用互斥锁来处理并发访问。
当一个协程向通道发送数据时,如果通道已满,发送操作会被阻塞,直到有其他协程从通道中接收数据释放空间。同样地,当一个协程从通道接收数据时,如果通道为空,接收操作也会被阻塞,直到有其他协程向通道发送数据。
同时,当多个协程同时访问通道时,Go运行时系统会自动处理协程之间的同步和并发访问的细节,保证数据的正确性和一致性。从而可以放心地在多个协程中使用通道进行数据的发送和接收操作,而不需要额外的锁或同步机制来保证线程安全。
因此,使用channel其实是可以避免常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性。
3.1.2 基本使用
通过上面对channel的基本介绍,我们已经对channel有了基本的了解,其实可以粗略理解其为一个并发安全的队列。下面来了解下channel的基本语法,从而能够开始使用channel。
channel基本操作分为创建channel,发送数据到channel,接收channel中的数据,以及关闭channel。下面对其进行简单展示:
创建channel,使用make函数创建通道,通道的类型可以根据需要选择,例如int、string等:
ch := make(chan int)
发送数据到channel:使用<-操作符将数据发送到通道中
ch <- data
接收channel中的数据: 使用<-操作符从通道中接收数据
result := <-ch
关闭channel, 使用close函数关闭通道。关闭通道后,仍然可以从通道接收数据,但无法再向通道发送数据
close(ch)
通过上面channel的四个基本操作,便能够实现在不同协程间线程安全得传递数据。最后通过一个例子,完整得展示channel的基本使用。
package mainimport "fmt"func main() {ch := make(chan string) // 创建字符串通道defer close(ch)go func() {ch <- "hello, channel!" // 发送数据到通道}()result := <-ch // 从通道接收数据fmt.Println(result)
}
在这个示例中,我们创建了一个字符串通道ch。然后,在一个单独的协程中,我们向通道发送了字符串"hello, channel!"。最后,主协程从通道中接收数据,并将其打印出来。
通过使用通道,我们可以实现协程之间的数据传输和同步,确保数据的安全共享和线程安全性。通道的使用能够简化并发编程的复杂性,提供一种高效、可靠的方式来处理并发场景下的数据传递。
3.2 使用channel实现汇总数据
下面,我们使用channel来实现并发数据汇总,替换掉之前使用互斥锁来保证线程安全的实现:
package mainimport ("fmt""sync""time"
)type Data struct {ID intName string
}func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {// 模拟 RPC 接口拉取数据的耗时操作time.Sleep(time.Second)// 假设从 RPC 接口获取到了一批数据data := Data{ID: page,Name: fmt.Sprintf("Data %d", page),}ch <- data // 将数据发送到通道wg.Done()
}func main() {var wg sync.WaitGroup// 定义需要拉取的数据页数numPages := 10dataCh := make(chan Data, 10) // 创建用于接收数据的通道// 启动多个协程并发地拉取数据for i := 1; i <= numPages; i++ {wg.Add(1)go fetchData(i, dataCh, &wg)}go func() {wg.Wait()close(dataCh) // 关闭通道,表示数据已经全部发送完成}()// 从通道接收数据并汇总var dataList []Datafor data := range dataCh {dataList = append(dataList, data)}// 打印拉取到的数据fmt.Println("Fetched data:")for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)}
}
在修改后的代码中,我们创建了一个用于接收数据的 dataCh。每个协程通过将数据发送到该channel 来完成数据的汇总。主协程通过从channel接收数据,并将其添加到 dataList 中实现数据的汇总过程。这种方式不需要显式地加锁和解锁,并且避免了互斥锁带来的复杂性和性能问题。
通过使用channel,我们能够以一种更直观、更安全的方式实现协程之间的数据传递和同步。channel在并发编程中起到了关键的作用,简化了并发操作的管理和实现。同时,它提供了内置的同步机制,保证了数据的正确性和一致性,避免了死锁和竞态条件的问题。
3.3 总结
协程间的并发下汇总数据可以归类为协程间的数据传递这个场景。在这个场景中,多个协程并发地拉取数据,然后将数据汇总到一个共享的数据结构中。为了保证数据的正确性和一致性,需要使用某种机制来确保多个协程对共享数据的并发访问是安全的。
在原始的实现中,使用了互斥锁来保护共享数据的并发访问。互斥锁提供了互斥访问的机制,确保同一时间只有一个协程可以访问共享数据,从而避免了数据竞争和不一致性。这种方式在保证线程安全的同时,引入了锁的开销和复杂性。
而使用channel来实现协程间的安全数据传递可以更简洁和高效。每个协程可以将拉取到的数据通过channel发送到主协程,主协程通过接收channel中的数据来进行汇总。channel提供了并发安全的数据传递机制,协程之间的数据传输是同步和有序的。由于channel本身就提供了同步机制,不需要额外的锁和同步操作,能够更简洁地实现协程间的安全数据传递。
因此,如果需要在多个协程间实现数据传递,而且由此可能带来线程安全的问题,此时使用channel来实现是相对比较合适的。
4. 开源项目中的使用
假设我们需要对etcd进行性能测试,此时需要模拟大量并发请求,对etcd进行负载测试,并收集每个请求的执行时间、成功/失败状态等结果数据。然后主协程需要收集每一个请求的结果数据,并进行统计计算,生成相应的性能报告。基于此,能够计算出总请求数、请求成功率、平均执行时间、最慢/最快请求等统计信息,以及错误分布情况和慢速请求的详细信息。
从上面的讲述来看,其实我们可以大概想象出这个模型,多个协程并发执行,然后获取每个请求的结果数据。然后主协程需要收集汇总这些数据,基于此来生成性能报告。这个模型其实也就是我们上面所说的协程并发下的数据汇总,因此通过channel来实现协程间的数据传输,是非常合适的。
下面我们来看看etcd中对应的实现。etcd中存在一个report对象的实现,能够接受一系列的请求数据的结果,然后生成性能报告返回回去。结构体定义如下:
type report struct {results chan Resultstats Stats
}
func (r *report) Results() chan<- Result { return r.results }// Result describes the timings for an operation.
type Result struct {Start time.TimeEnd time.TimeErr error
}func newReport(precision string) *report {r := &report{results: make(chan Result, 16),}return r
}
Result结构体为单个测试的结果,而 report 结构体则用于整个测试过程的报告和统计信息。通过使用 results 通道,可以将每个测试的结果发送到 report 结构体中,以便进行统计和生成报告。
当进行性能压测时,首先通过newReport生成一个report对象,然后启动多个协程同时进行压测请求,每一个请求处理完成之后,便会生成一个处理结果,存储到Result对象当中。然后基于report对象的Results方法获取到对应的channel,将处理结果传输给主协程。
主协程便通过遍历report对象中的results变量对应的channel,汇总计算所有处理结果,基于此便能够生成压测结果和报告。下面来看其具体流程。
首先是创建一个report对象,然后启动多个协程来处理请求,将结果发送到report对象中的results对应的channel中。
// 这里NewReportSample方法,其实是对上面newReport方法的一个封装
r := NewReportSample("%f")
// 这里假设只有一个协程,模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。
go func() {start := time.Now()for i := 0; i < 5; i++ {// 不真实进行请求,只是简单获取执行结果,将测试结果进行传输end := start.Add(time.Second)r.Results() <- Result{Start: start, End: end}start = end}r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}// 假设所有压测请求都执行完成了close(r.Results())
}()
// 主协程 汇总所有的处理结果,然后生成压测报告
stats := <-r.Stats()
以上代码中,r 是通过 NewReportSample("%f") 创建的一个 Report 对象。然后,在一个单独的协程中,执行了一系列的测试,并将测试结果发送到 r.Results() 通道中。
这段代码的作用是模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。通过使用 r.Results() 方法返回的通道,可以将测试结果发送到报告对象中进行统计和处理。
接下来,主协程应该不断从 r.Results()方法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个方法其实是被封装在r.Stas()方法中,具体如下:
func (r *report) Stats() <-chan Stats {// 创建一个channeldonec := make(chan Stats, 1)// 启动一个协程来执行go func() {defer close(donec)r.processResults()s := r.stats.copy()if r.sps != nil {s.TimeSeries = r.sps.getTimeSeries()}// 执行完成的话,将结果返回donec <- s}()// 返回channelreturn donec
}// Stats方法启动的协程中,实际运行的任务
func (r *report) processResults() {st := time.Now()// 遍历r.results方法中channel中的数据,然后执行处理流程for res := range r.results {r.processResult(&res)}// 后续执行一些具体的计算逻辑
}
上述代码是 report 结构体中的两个方法,其中 Stats() 方法返回一个只读的 Stats 通道。这个方法会在一个单独的协程中执行,并处理 results 通道中的测试结果。事实上就是汇总channel中的数据,然后进行一定的处理,然后返回。
5. 总结
本文通过介绍并发编程中的数据汇总问题,提出了使用互斥锁和通道来保证线程安全的方法。互斥锁适用于临界区保护和共享资源的互斥访问,但可能存在死锁和性能瓶颈的问题。相比之下,通道提供了更直观和安全的协程间通信方式,避免了锁的问题,并提供了更灵活的并发模式。
基于以上内容的介绍,大概能够明确下,在数据传递和汇总的场景下,使用channel来实现可能是更为合适的,能够提高代码的可读性和并发安全性。希望以上内容对你有所帮助。
相关文章:
协程并发下数据汇总:除了互斥锁,还有其他方式吗?
1. 简介 本文介绍了在并发编程中数据汇总的问题,并探讨了在并发环境下使用互斥锁和通道两种方式来保证数据安全性的方法。 首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,…...
5、Ray-Actor模型和并发编程
5、Ray-Actor模型和并发编程 导航 1.简介和背景 2.Ray的基本概念和核心组件 3.分布式任务调度和依赖管理 4.对象存储和数据共享 5.Actor模型和并发编程 6.Ray的高级功能和扩展性 7.使用Ray构建分布式应用程序的案例研究 8.Ray社区和资源 9.核心框架介绍...
HNU-电路与电子学-小班2
第二次讨论 讨论题目: 1、电子秤的电桥电路可以分别用 1 个压控电阻、 2 个压控电阻、 3 个压控电阻、 4 个压控电阻实现吗?试写出每种实现的 U AB 输出表达式,并分析哪种实现电桥 电压的灵敏度(SV/ △ R )高。 …...
二分图匹配算法
匈牙利算法、Hopcroft-Karp算法和Kuhn-Munkres算法是三种常见的二分图匹配算法,它们在实现方式、时间复杂度和适用场景上有所差异。以下是它们的区别和优缺点: 匈牙利算法: 实现方式:匈牙利算法使用深度优先搜索(DFS)来寻找增广路…...
虹科技术 | 虹科EtherCAT增量编码器输入模块数据采集实操测试
1. 背景介绍 编码器是将信号或数据进行编制、转换为可用以通讯、传输和存储的信号形式的设备。编码器把角位移或直线位移转换成电信号,前者称为码盘,后者称为码尺。按照读出方式编码器可以分为接触式和非接触式两种;按照工作原理编码器可分为…...
2023.05.21 学习周报
文章目录 摘要文献阅读1.题目2.背景3.现存问题和解决方法4.方法4.1 Variational mode decomposition (VMD)4.2 Bidirectional LSTM 5.实验5.1 数据标准化5.2 评价指标5.3 实验过程及结果 6.结论和展望 优劣解距离法有限元1.求解一个简单的传热问题2.有限元如何实现 总结 摘要 …...
资深程序员深度体验ChatGPT一周发现竟然....
周一打卡上班,老板凑到我跟前:“小李啊,这周有个新需求交给你做一下,给我们的API管理平台新增一个智能Mock的功能...”。我条件反射般的差点脱口而出:“这个需求做不了..”。不过在千钧一发之间,我想起了最…...
带你深入了解Android Handler的用法
Android中,Handler是一类用于异步消息传递和线程之间通信的基础框架。一个Handler是一个线程的处理器,可以接收消息,并调度运行它们。使用Handler,应用程序可以将处理器与一个线程关联,以将来的时间运行任务。而使用Ha…...
生于零售的亚马逊云科技,如何加速中国跨境电商企业出海?
导读:跨境电商进入精耕细作的新阶段。 作为中国企业出海的重要领域之一,近几年跨境电商行业处在快速发展中。商务部数据显示,2022年中国跨境电商出口达1.55万亿,同比增长11.7%。2023年1-2月,跨境电商进出口总额同比增长…...
兄弟组件传值$on无法接收值
方法一 前提是必须引入EventBus,而且该方法一刷新数据就没了 1.组件A里,点击事件里面使用$emit传入数据 2.组件A里,mounted里面使用$on接收数据,并把数据赋给EventBus EventBus.$on(detail,(data) > { EventBus.senddata d…...
Spring事务及事务传播机制
一.事务的含义:多个操作封装在一起,要么同时执行成功,一旦有一个操作执行失败,那么全部执行失败。这里给大家举个例子:比如A给B转账50元,而B没有收到这50元,此时A转账B这个操作也需要进行回滚,恢复到A给B没…...
npm i 常见问题
需要注意的是,如果你在使用 NPM 安装包的过程中遇到了任何问题,可以尝试使用 --verbose 参数打印更详细的错误信息,以便更好地诊断问题。例如: npm install --verbose 1、vue老项目缺少编译环境安装依赖报错的问题 待下载的项目…...
Prometheus+Grafana监控系统
一、简介 1、Prometheus简介 官网:https://prometheus.io 项目代码:https://github.com/prometheus Prometheus(普罗米修斯)是一个最初在SoundCloud上构建的监控系统。自2012年成为社区开源项目,拥有非常活跃的开发人员…...
基于脉冲神经网络的物体检测
访问【WRITE-BUG数字空间】_[内附完整源码和文档] 研究的意义在于探索脉冲神经网络在目标检测上的应用,目前主流的脉冲神经网络训练算法有直接BP训练、STDP无监督训练和训练好的ANN的转化,虽然训练算法众多,但是SNN仍然没有一套成熟的训练算…...
Rust每日一练(Leetday0010) 子串下标、两数相除、串联子串
目录 28. 找出字符串中第一个匹配项的下标 Find-the-index-of-the-first-occurrence-in-a-string 🌟🌟 29. 两数相除 Divide Two Integers 🌟🌟 30. 串联所有单词的子串 Substring-with-concatenation-of-all-words &#x…...
As ccess 数据库与表的操作
1. Access 数据库设计的一般步骤 . 2. 基本概念:Access 数据库、表、记录、字段 . 3. 使用表设计器创建表 (1)字段名命名规则 不能空格开头、不能用.!()[]、最长 64 个字符 (2)字段类型:文本、数字、日期/时…...
自动化的测试工具
1, 自动化功能测试工具:QTP、selenium 2, 自动化性能测试功能:LoadRunner、jmeter 3, 自动化接口测试工具:Charles、soapUI、LoadRunner、jmeter、postman、 测试工具 4, 测试管理工…...
Host头攻击
转载与:https://blog.csdn.net/weixin_47723270/article/details/129472716 01 HOST头部攻击漏洞知识 Host首部字段是HTTP/1.1新增的,旨在告诉服务器,客户端请求的主机名和端口号,主要用来实现虚拟主机技术。 运用虚拟主机技术&a…...
Android 12.0默认开启无障碍服务权限和打开默认apk无障碍服务
1.概述 在12.0的系统rom定制化开发中,在第三方app开发中,需要开启无障碍服务功能,就不需要在代码中开启无障碍服务了, 为了简便就需要在系统中开启无障碍服务,来实现开启无障碍服务功能 2. 默认开启无障碍服务权限和打开默认apk无障碍服务核心代码 frameworks/base/core…...
怎么成为优秀的软件工程师,而不是优秀的码农?
作为软件行业的从业者,每个人都希望最终成为优秀的软件工程师,而不仅仅是码农。一个码农只关注于编写代码和解决问题,而一个软件工程师则涉及到更广泛的职责和技能。 以下是一些要点,可以帮助你脱颖而出,成为一个优秀…...
7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
