当前位置: 首页 > news >正文

golang线程池ants-实现架构

1、总体架构

ants协程池,在使用上有多种方式(使用方式参考这篇文章:golang线程池ants-四种使用方法),但是在实现的核心就一个,如下架构图:

总的来说,就是三个数据结构: Pool、WorkerStack、goWorker以及这三个结构实现的方法,了解了这些,基本上对ants的实现原理就了如指掌了。

2、详细实现

2.1 worker的设计实现

worker结构如下:

type goWorker struct {// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}

该结构设计非常简单,三个成员:归属的线程池、执行函数、该worker最后一次运行时间,goWorker结构实现如下接口:

type worker interface {run()finish()lastUsedTime() time.TimeinputFunc(func())inputParam(interface{})
}

核心函数run,该函数从管道task里获取到任务函数,并执行,执行完成后,将此worker放回协程池(此时worker阻塞等待任务到来,调用函数:w.pool.revertWorker(w)放回池子中),以便复用:

func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}

finish函数,调用该函数,代表此worker的生命周期结束:

func (w *goWorker) finish() {w.task <- nil
}

这个时候run函数从遍历task管道中结束,进入defer函数,worker放入workerCache,备用。

inputFunc很容易理解,将任务放入管道,让worker去执行:

func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}

2.2 workerStack结构

type workerStack struct {items  []workerexpiry []worker
}

该结构就两个成员,都为worker的切片,items切片用于存储正常执行的worker,expiry存放过期的worker,workStack结构实现了如下接口:

type workerQueue interface {len() intisEmpty() boolinsert(worker) errordetach() workerrefresh(duration time.Duration) []worker // clean up the stale workers and return themreset()
}

len函数:返回正在运行worker的长度

isEmpty函数:判断是否有正在运行的worker

insert函数:将worker插入切片。

detach函数:获取一个worker。

refresh:更新所有worker,淘汰过期worker。

reset:清除所有worker。

重点看refresh函数:

func (wq *workerStack) refresh(duration time.Duration) []worker {n := wq.len()if n == 0 {return nil}expiryTime := time.Now().Add(-duration)index := wq.binarySearch(0, n-1, expiryTime)wq.expiry = wq.expiry[:0]if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)m := copy(wq.items, wq.items[index+1:])for i := m; i < n; i++ {wq.items[i] = nil}wq.items = wq.items[:m]}return wq.expiry
}

 这个函数用于根据给定的时间间隔duration来刷新工作队列中的过期项。主要执行以下步骤:

  1. 获取队列长度:首先,通过调用wq.len()获取工作队列wq中当前元素的数量n。如果队列为空(即n == 0),则直接返回nil,表示没有过期项。

  2. 计算过期时间:通过time.Now().Add(-duration)计算出一个时间点,这个时间点是duration时间之前的时间,即认为是“过期”的时间点。

  3. 二分查找:使用wq.binarySearch(0, n-1, expiryTime)在队列中查找第一个过期项的位置(即第一个最后使用时间早于expiryTime的项)。这个函数返回一个索引,如果找到这样的项,则返回该项的索引;如果没有找到,则返回-1

  4. 清理过期项

    • 首先,清空wq.expiry切片,用它来存储所有过期的项。
    • 如果找到了过期项(即index != -1),则将wq.items中从0index(包含index)的所有项(即所有过期项)追加到wq.expiry中。
    • 然后,使用copy函数将wq.items中从index+1n-1的所有项向前移动,覆盖掉前面的过期项。这里mcopy函数返回的值,表示实际复制的元素数量,即队列中剩余的非过期项的数量。
    • 接下来,遍历wq.items中从mn-1的所有位置,将它们设置为nil。
    • 最后,通过wq.items = wq.items[:m]更新wq.items的长度,去除所有过期的项。
  5. 返回过期项:函数返回wq.expiry,这是一个包含所有被移除的过期项的切片。

需要注意的是,wq.items是一个切片,用于存储工作项;wq.expiry也是一个切片,用于临时存储过期的项。

 2.3 Pool结构

pool结构的定义源码稍作改了一下,之前poolCommon的结构就是Pool的结构,目前最新版本做了一个简单的封装。

type Pool struct {poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

创建一个线程池:

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{poolCommon: poolCommon{capacity: int32(size),allDone:  make(chan struct{}),lock:     syncx.NewSpinLock(),once:     &sync.Once{},options:  opts,}}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}

看如下几行代码:

	p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}

workerCache为sync.Pool类型,sync.Pool是Go语言标准库中提供的一个对象池化的工具,旨在通过复用对象来减少内存分配的频率并降低垃圾回收的开销,从而提高程序的性能。其内部维护了一组可复用的对象。当你需要一个对象时,可以尝试从sync.Pool中获取。如果sync.Pool中有可用的对象,它将返回一个;否则,它会调用你提供的构造函数来创建一个新对象,sync.PoolNew字段是一个可选的函数,用于在池中无可用对象时创建新的对象。

这里这样写即为:当无可用的worker时,则通过New函数创建一个新的worker。

创建workder列表,内部其实就是创建了了一个切片,类型为workerStack,用于管理所有的worker。

p.workers = newWorkerQueue(queueTypeStack, 0)

NewPool函数执行完成后,一个协程池就创建完成了。

协程池创建完成后,需要用来处理任务,如何将任务函数传递到worker去执行呢?看如下函数:

// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}

函数的入参为一个无返回值、无入参的函数,因此所有需要worker执行的函数都是func()类型,w, err := p.retrieveWorker(),取出一个空闲worker,取出成功后,将任务传递到worker内部:w.inputFunc(task),注意,当线程池中所有worker都忙碌时,inputFunc函数阻塞,一直到有worker空闲。

其他主要的函数,从池中获取worker的函数:

func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}

这个函数,获取worker有三个逻辑:

  • 当池中有空闲worker,直接获取。
  • 当池中没有空闲worker,从缓存workerCache中取出过期的worker使用,复用资源,降低开销。
  • 等待有worker执行完任务释放。(阻塞情况)

revertWorker,将worker放回池中,以执行下次的任务。 

func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.lastUsed = p.nowTime()p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}

以上就为ants线程池实现的主要技术细节,希望对各位热爱技术的同学们提供一些些帮助。

3、总结

ants协程池是一个高性能、易用的Go语言协程池库,它通过复用goroutines、自动调度任务、定期清理过期goroutines等方式,帮助开发者更加高效地管理并发任务。无论是处理网络请求、数据处理还是其他需要高并发性能的场景,ants协程池都是一个值得推荐的选择。

相关文章:

golang线程池ants-实现架构

1、总体架构 ants协程池&#xff0c;在使用上有多种方式(使用方式参考这篇文章&#xff1a;golang线程池ants-四种使用方法)&#xff0c;但是在实现的核心就一个&#xff0c;如下架构图&#xff1a; 总的来说&#xff0c;就是三个数据结构&#xff1a; Pool、WorkerStack、goW…...

Mysql面试合集

概念 是一个开源的关系型数据库。 数据库事务及其特性 事务&#xff1a;是一系列的数据库操作&#xff0c;是数据库应用的基本逻辑单位。 事务特性&#xff1a; &#xff08;1&#xff09;原子性&#xff1a;即不可分割性&#xff0c;事务要么全部被执行&#xff0c;要么就…...

Android Gradle 开发与应用 (五): 构建变体与自定义任务

目录 1. 概述 2. 构建变体 2.1 构建变体的概念 2.2 构建类型 2.3 产品风味 2.4 构建变体的使用 3. 自定义任务 3.1 自定义任务的概念 3.2 创建自定义任务 3.3 配置任务依赖 3.4 任务类型 3.5 动态任务 3.6 自定义任务执行顺序 4. 案例 4.1 多渠道打包 4.2 自动…...

Django学习第六天

启动项目命令 python manage.py runserver 取消模态框功能 js实现列表数据删除 第二种实现思路 使用jquery修改模态框标题 编辑页面拿到数据库数据显示默认数据功能实现 想要去数据库中获取数据时&#xff1a;对象/字典 三种不同的数据类型 使用Ajax传入数据实现表单编辑&…...

docker部署mycat,连接上面一篇的一主二从mysql

一、docker下载mycat镜像 查看安装结果 这个名称太长&#xff0c;在安装容器时不方便操作&#xff0c;设置标签为mycat docker tag longhronshens/mycat-docker mycat 二、安装容器 先安装一个&#xff0c;主要目的是获得配置文件 docker run -it -d --name mycat -p 8066:…...

VUE2拖拽组件:vue-draggable-resizable-gorkys

vue-draggable-resizable-gorkys组件基于vue-draggable-resizable进行二次开发, 用于可调整大小和可拖动元素的组件并支持冲突检测、元素吸附、元素对齐、辅助线 安装: npm install --save vue-draggable-resizable-gorkys 全局引用: import Vue from vue import vdr fro…...

容器:stack

以下是关于stack容器的一些总结&#xff1a; stack容器比较简单&#xff0c;主要包括&#xff1a; 1、构造函数&#xff1a;stack [staName] 2、添加、删除元素: push() 、pop() 3、获取栈顶元素&#xff1a;top() 4、获取栈的大小&#xff1a;size() 5、判断栈是否为空&#x…...

跨平台Ribbon UI组件QtitanRibbon全新发布v6.7.0——支持Qt 6.6.3

没有Microsoft在其办公解决方案中提供的界面&#xff0c;就无法想象现代应用程序&#xff0c;这个概念称为Ribbon UI&#xff0c;目前它是使应用程序与时俱进的主要属性。QtitanRibbon是一款遵循Microsoft Ribbon UI Paradigm for Qt技术的Ribbon UI组件&#xff0c;QtitanRibb…...

(6) 深入探索Python-Pandas库的核心数据结构:DataFrame全面解析

目录 前言1. DataFrame 简介2. DataFrame的特点3. DataFrame的创建3.1 使用字典创建DataFrame3.2 使用列表的列表&#xff08;或元组&#xff09;创建DataFrame3.3 使用NumPy数组创建DataFrame3.4 使用Series构成的字典创建DataFrame3.5 使用字典构成的字典创建DataFrame 4. 从…...

在 Azure 云中开始使用适用于 Ubuntu 的 Grafana

介绍 Grafana 是一款开源工具&#xff0c;可用于可视化和分析数据。它特别适合跟踪计算机系统的运行情况。在构建微服务或其他类型的应用程序时&#xff0c;您可能需要分析日志数据、轻松可视化数据或设置特殊警报以接收有关系统中发生的某些事件的通知。 这就是为什么你可能…...

1.Python学习笔记

一、环境配置 1.Python解释器 把程序员用编程语言编写的程序&#xff0c;翻译成计算机可以执行的机器语言 安装&#xff1a; 双击Python3.7.0-选择自定义安装【Customize installation】-勾选配置环境变量 如果没有勾选配置环境变量&#xff0c;输入python就会提示找不到命令…...

中英双语介绍百老汇著名歌剧:《猫》(Cats)和《剧院魅影》(The Phantom of the Opera)

中文版 百老汇著名歌剧 百老汇&#xff08;Broadway&#xff09;是世界著名的剧院区&#xff0c;位于美国纽约市曼哈顿。这里汇集了许多著名的音乐剧和歌剧&#xff0c;吸引了全球各地的观众。以下是两部百老汇的经典音乐剧&#xff1a;《猫》和《剧院魅影》的详细介绍。 1.…...

RpcChannel的调用过程

目录 1. RPC调用方&#xff08;caller&#xff09;的调用(消费)过程 2.在caller下创建文件&#xff1a;calluserservice.cc 3.在src的include下创建文件&#xff1a;mprpcchannel.h 4.在src下创建mprpcchannel.cc 1. RPC调用方&#xff08;caller&#xff09;的调用(消费)过…...

东芝TB6560AHQ/AFG步进电机驱动IC:解锁卓越的电机控制性能

作为一名工程师&#xff0c;一直在寻找可靠且高效的组件来应用于你的项目中。东芝的TB6560AHQ/AFG步进电机驱动IC能够提供精准且多功能的电机控制&#xff0c;完全符合现代应用的高要求&#xff0c;保证高性能和易用性。在这篇文章中&#xff0c;我们将探讨TB6560AHQ/AFG的主要…...

免杀笔记 ----> DLL注入

这段时间我们暂时没什么事情干的话我们就继续更新我们的免杀笔记力&#xff01;&#xff01;&#xff01; &#xff1a;今天我们讲DLL注入 目录 1.DLL注入 2.直接加载DLL&#xff1f; 3.远程线程注入 获取Handle 远程申请内存空间 将我们的CS的DLL加载入内存 创建远程线…...

奇迹MU 骷髅战士在哪

BOSS分布图介绍 我为大家带来各地区怪物分布图。在游戏前期&#xff0c;很多玩家可能会不知道该去哪里寻找怪物&#xff0c;也不知道哪些怪物值得打。如果选择了太强的怪物&#xff0c;弱小的玩家可能会无法抵御攻击。如果选择了低等级的boss&#xff0c;收益可能并不理想。所…...

leetcode力扣_贪心思想

455.分发饼干&#xff08;easy-自己想得出来并写好&#xff09; 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺…...

Vue中Class数据绑定

Class数据绑定 数据绑定的一个常见需求场景是操作CSS class列表&#xff0c;因为class是attribute&#xff08;属性&#xff09;&#xff0c;我们可以和其他attribute一样使用v-bind 将它们和动态的字符串绑定。但是&#xff0c;在处理比较复杂的绑定时&#xff0c;通过拼接生…...

Python数据分析案例49——基于机器学习的垃圾邮件分类系统构建(朴素贝叶斯,支持向量机)

案例背景 trec06c是非常经典的邮件分类的数据&#xff0c;还是难能可贵的中文数据集。 这个数据集从一堆txt压缩包里面提取出来整理为excel文件还真不容不易&#xff0c;肯定要做一下文本分类。 虽然现在文本分类基本都是深度学习了&#xff0c;但是传统的机器学习也能做。本案…...

贪心算法-以学籍管理系统为例

1.贪心算法介绍 1.算法思路 贪心算法的基本思路是从问题的某一个初始解出发一步一步地进行&#xff0c;根据某个优化测度&#xff0c;每一 步都要确保能获得局部最优解。每一步只考虑一 个数据&#xff0c;其选取应该满足局部优化的条件。若下 一个数据和部分最优解连在一起…...

跨平台终端工具cmatrix:打造震撼的数字雨可视化效果

跨平台终端工具cmatrix&#xff1a;打造震撼的数字雨可视化效果 【免费下载链接】cmatrix Terminal based "The Matrix" like implementation 项目地址: https://gitcode.com/gh_mirrors/cm/cmatrix 你是否曾幻想过在自己的终端中重现《黑客帝国》里令人着迷的…...

SDMatte+边缘精修效果展示:羽毛建模精度、纱布透光过渡、叶片脉络保留

SDMatte边缘精修效果展示&#xff1a;羽毛建模精度、纱布透光过渡、叶片脉络保留 1. 惊艳效果开场 想象一下这样的场景&#xff1a;你需要为一件羽毛饰品拍摄产品图&#xff0c;但无论怎么调整灯光和背景&#xff0c;羽毛边缘总是显得模糊不清&#xff1b;或者当你尝试抠出一…...

OpenClaw更换stepfun/step-3.5-flash模型报错:Unknown model 解决(核心:漏加前缀)

OpenClaw更换stepfun/step-3.5-flash模型报错&#xff1a;Unknown model 解决&#xff08;核心&#xff1a;漏加前缀&#xff09; 摘要&#xff1a;本文聚焦OpenClaw更换stepfun/step-3.5-flash:free模型时&#xff0c;高频报错「Unknown model」的核心解决方法——忘记给主模…...

PEI转染试剂及相关工具在生命科学研究中的应用解析【曼博生物官方代理Polysciences】

摘要&#xff1a;聚乙烯亚胺&#xff08;PEI&#xff09;转染试剂在基因递送、病毒载体生产等领域应用广泛。本文结合Polysciences相关产品体系&#xff0c;对PEI转染、微球技术及神经示踪染料等工具进行系统梳理。 关键词&#xff1a;PEI转染、聚乙烯亚胺、基因转染、HEK293、…...

OpenClaw+nanobot技能开发:从零编写自定义文件处理器

OpenClawnanobot技能开发&#xff1a;从零编写自定义文件处理器 1. 为什么需要自定义文件处理技能 上周我整理项目文档时&#xff0c;遇到了一个典型问题&#xff1a;需要将数百个Markdown文件按照"日期-标题"格式批量重命名。手动操作不仅耗时&#xff0c;还容易出…...

【Mojo+Python混合部署失效真相】:92%开发者忽略的编译期符号冲突、运行时上下文隔离与调试断点丢失问题

第一章&#xff1a;MojoPython混合部署失效真相全景概览Mojo 作为新兴的高性能系统编程语言&#xff0c;设计初衷是与 Python 生态无缝互操作&#xff1b;然而在真实生产部署中&#xff0c;“Mojo Python 混合部署”常出现静默失败、ABI 不兼容、运行时崩溃或性能断崖式下降等…...

在对话中处理生物特征(指纹、虹膜)时,OpenClaw 的识别精度?

关于OpenClaw在生物特征识别上的精度&#xff0c;其实很难给出一个绝对的数字。这倒不是因为技术本身有什么神秘之处&#xff0c;而是因为精度这个指标&#xff0c;在实际应用中常常被误解了。 很多人一提到识别精度&#xff0c;脑子里立刻会冒出一个百分比&#xff0c;比如99.…...

零基础玩转OpenClaw:nanobot镜像可视化控制台入门

零基础玩转OpenClaw&#xff1a;nanobot镜像可视化控制台入门 1. 为什么选择nanobot镜像作为OpenClaw入门 第一次接触OpenClaw时&#xff0c;我被它强大的本地自动化能力所吸引&#xff0c;但很快就被复杂的命令行配置劝退了。直到发现了nanobot这个超轻量级OpenClaw镜像&…...

Nanbeige 4.1-3B赋能微信小程序:打造智能客服对话机器人

Nanbeige 4.1-3B赋能微信小程序&#xff1a;打造智能客服对话机器人 最近在帮一个做电商的朋友琢磨怎么优化他们的客服系统。他们每天要处理大量重复的咨询&#xff0c;比如“什么时候发货”、“怎么退换货”&#xff0c;人工客服忙得团团转&#xff0c;用户还得排队等。这让我…...

告别卡顿闪烁!在Cesium 1.134中集成SOG格式,让400万高斯秒级加载

突破性能瓶颈&#xff1a;Cesium 1.134集成SOG格式实现400万高斯秒级渲染 在三维地理空间可视化领域&#xff0c;Cesium一直是开发者构建高精度场景的首选引擎。但当项目涉及数百万级高斯泼溅数据时&#xff0c;传统加载方式往往导致令人崩溃的卡顿和视角移动时的闪烁问题。最近…...