go并发设计模式runner模式
go并发设计模式runner模式
真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。
假设现在有一个程序需要实现,这个程序有以下要求:
- 程序可以在分配的时间内完成工作,正常终止;
- 程序没有及时完成工作,“自杀”;
- 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作
数据类型设计
程序需要在规定时间内完成工作的最简单方法就是使用goroutine和channel,我们需要一个chan用来接收操作完成的信号,完成任务的函数可能有错误信息返回,因此我们这里定义一个错误类型的通道,用来通知什么时候完成任务以及完成任务的错误信息。
complete chan error
任务执行超时的最简单方法就是使用time包提供的After函数,当指定的时间内没有完成任务那么就出发一下超时通道,因为只需要接收超时的信号,因此只需要定义一个单向接收通道即可
timeout <-chan time.Time
当发生系统中断事件时,程序能立刻清理状态然后清理资源并停止工作,因此我们需要一个信号通道来接收操作系统发送的中断信号,这里我们使用signal包提供的Notify函数来注册信号,当操作系统发送信号时,会通过信号通道发送信号
interrupt chan os.Signal
程序最重要的是能够处理任务,用户需要处理多少任务提前是不能确定的,我们需要一个任务列表,这里我们使用一个切片来保存这些任务。
tasks []func(int)
经过上述设计,我们定义一个Runner结构体,用来保存这些通道和任务列表。
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时的接收通道timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}
错误系统设计
错误系统设计,我们希望在任务执行完成或者超时或者操作系统发送的中断信号时返回错误,因此我们定义两个个错误变量,分别用来保存超时错误,中断错误和正常完成错误。
// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")
数据类型说明
Signal
os.Signal 是一个接口类型,是对不同操作系统上捕获的信号的一个抽象接口,用来从操作系统接收中断事件。
// A Signal represents an operating system signal.
// The usual underlying implementation is operating system-dependent:
// on Unix it is syscall.Signal.
type Signal interface {String() stringSignal() // to distinguish from other Stringers
}
Error
error 是一个接口类型,用来表示错误,所有错误类型都实现了error接口,因此我们可以通过error接口来判断错误类型。
Time
time.Time 是一个结构体类型,用来表示一个时间,包含年月日时分秒纳秒等信息。
type Time struct {// wall and ext encode the wall time seconds, wall time nanoseconds,// and optional monotonic clock reading in nanoseconds.//// From high to low bit position, wall encodes a 1-bit flag (hasMonotonic),// a 33-bit seconds field, and a 30-bit wall time nanoseconds field.// The nanoseconds field is in the range [0, 999999999].// If the hasMonotonic bit is 0, then the 33-bit field must be zero// and the full signed 64-bit wall seconds since Jan 1 year 1 is stored in ext.// If the hasMonotonic bit is 1, then the 33-bit field holds a 33-bit// unsigned wall seconds since Jan 1 year 1885, and ext holds a// signed 64-bit monotonic clock reading, nanoseconds since process start.wall uint64ext int64// loc specifies the Location that should be used to// determine the minute, hour, month, day, and year// that correspond to this Time.// The nil location means UTC.// All UTC times are represented with loc==nil, never loc==&utcLoc.loc *Location
}
方法设计
在go中方法需要示例进行调用,因此我们最后定义一个用来创建Runner实例的New方法,避免用户自行创建实例,导致示例的创建不统一。
名为 New 的工厂函数。这个函数接收一个 time.Duration 类型的值,并返回 Runner 类型的指针。这个函数会创建一个 Runner 类型的值,并初始化每个通道字段。因为 task 字段的零值是 nil,已经满足初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程
通道 interrupt 被初始化为缓冲区容量为 1 的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时候不会被阻塞。如果 goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C 组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。
通道 complete 被初始化为无缓冲的通道。当执行任务的 goroutine 完成时,会向这个通道发送一个 error 类型的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值, goroutine 就可以安全地终止了。
最后一个通道 timeout 是用 time 包的 After 函数初始化的。 After 函数返回一个time.Time 类型的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。
// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout: time.After(d),}
}
Add 方法用来添加任务,因为需要执行的任务前期并不确定有多少,因此Add接收一个名为tasks的可变参数,可变参数可以接受任意数量的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都
不返回的函数。函数执行时的参数 tasks 是一个存储所有这些传入函数值的切片。
// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}
run 方法会迭代 tasks 切片,并按顺序执行每个函数
func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}
gotInterrupt 展示了带 default 分支的 select 语句的经典用法。代码试图从 interrupt 通道去接收信号。一般来说, select 语句在没有任何要接收的数据时会阻塞,不过有了 default 分支就不会阻塞了。 default 分支会将接收 interrupt 通道的阻塞调用转变为非阻塞的。如果 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行 default 分支。当收到中断信号后,代码会通过调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true。如果没有收到中断信号,在第 99 行该方法会返回 false。本质上,gotInterrupt 方法会让 goroutine 检查中断信号,如果没有发出中断信号,就继续处理工作。
// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}
一切步骤都执行完了,现在开始执行任务
// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}
}
将以上代码全部都整合到runner.go文件中
// Package runner 处理任务的运行和声明周期管理
package runnerimport ("errors""os""os/signal""time"
)// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout: time.After(d),}
}// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}}func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}
在main.go中进行调用
package mainimport ("log""os""time""code/runner"
)// timeout 定义程序执行超时时间,如果超过这个时间还没执行完成会失败退出.
const timeout = 3 * time.Second// 主函数入口
func main() {log.Println("Starting work.")// 调用New创建 runner对象.r := runner.New(timeout)// 向任务队列中添加需要顺序执行的任务r.Add(createTask(), createTask(), createTask())// Run 执行人物,并按照返回错误处理if err := r.Start(); err != nil {switch err {case runner.ErrTimeout:log.Println("Terminating due to timeout.")os.Exit(1)case runner.ErrInterrupt:log.Println("Terminating due to interrupt.")os.Exit(2)}}// 记录执行结果log.Println("Process ended.")
}// createTask 返回一个入参为int的函数
func createTask() func(int) {return func(id int) {log.Printf("Processor - Task #%d.", id)time.Sleep(time.Duration(id) * time.Second)}
}
源码已经放到gitee需要的自行下载:
https://gitee.com/andrewgithub/note_lab/blob/main/example/go/concurrent_mode/runner/runner.go
相关文章:

go并发设计模式runner模式
go并发设计模式runner模式 真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。 假设现在有一个程序需要实现,这个程序有以下要求: 程序可以在分配的时间内完成工作࿰…...

nn.RNN解析
以下是RNN的计算公式,t时刻的隐藏状态H(t)等于前一时刻隐藏状态H(t-1)乘以参数矩阵,再加t时刻的输入x(t)乘以参数矩阵,最后再通过激活函数,等到t时刻隐藏状态。 下图是输出input和初始化的隐藏状态,当参数batch_first True时候&…...

How to monitor Spring Boot apps with the AppDynamics Java Agent
本文介绍如何使用 AppDynamics Java 代理监视 Azure Spring Apps 中的 Spring Boot 应用程序。 使用 AppDynamics Java 代理可以: 监视应用程序使用环境变量配置 AppDynamics Java 代理 在 AppDynamics 仪表板中检查所有监视数据 How to monitor Spring Boot app…...

Linux学习笔记12 systemd的其他命令
前文已经介绍了systemd在系统初始化中起到的作用和服务的管理和配置。这里补充一下systemd的其他工具和系统进程的管理 前文 Linux学习笔记10 系统启动初始化,服务和进程管理(上)-CSDN博客 Linux学习笔记11 系统启动初始化,服务…...

NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比
NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比 目录 NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介…...
【分布式】分布式缓存
一、什么是分布式缓存 分布式缓存是一种将缓存数据存储在多个节点上的缓存方案。它通过将数据分散存储在多个节点的内存中,以提高系统的读取性能、降低数据库压力和提高系统可扩展性。 二、分布式缓存的优点 优点明细提高性能:分布式缓存可以将数据缓…...

深度学习中的迁移学习:应用与实践
引言 在深度学习领域,迁移学习(Transfer Learning)是一个非常强大且日益流行的概念,它通过将从一个任务中学到的知识应用于另一个任务,能够显著加快模型训练速度并提高其泛化能力。迁移学习在许多实际应用中都得到了广…...

28.UE5实现对话系统
目录 1.对话结构的设计(重点) 2.NPC对话接口的实现 2.1创建类型为pawn的蓝图 2.2创建对话接口 3.对话组件的创建 4.对话的UI设计 4.1UI_对话内容 4.2UI_对话选项 4.3UI_对话选项框 5.对话组件的逻辑实现 通过组件蓝图,也就是下图中的…...

Redis中的分布式锁(步步为营)
分布式锁 概述 分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。 分布式锁是可以跨越多个实例,多个进程的锁 分布…...
CentOS 7安装mysql+JDK+Tomcat完成流程
一.安装mysql 即使是新的linux服务器,也要先验证是否有mysql已经安装,如果有进行卸载原版本,一定要确认是否mysql已不再使用 原安装情况(直接执行命令即可) whereis mysql rpm -qa | grep -i mysql rpm -e perl-DBD-M…...
C++笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec()
C笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec() code review! 参考笔记 1.qt-C笔记之使用QtConcurrent异步地执行槽函数中的内容,使其不阻塞主界面 2.qt-C笔记之QThread使用 3.qt-C笔记之多线程架构模式:事件信号监…...

C++异常处理
目录 一、异常的概念 二、异常的使用 (1)异常的抛出和捕获 (2)异常的重新抛出 (3)异常安全 (4)异常规范 三、自定义异常体系 四、c标注异常体系 五、异常的优缺点 在之前我们…...

【数据结构】哈希 ---万字详解
unordered系列关联式容器 在C98中,STL提供了底层为红黑树结构的一系列关联式容器,在查询时效率可达到log_2 N,即最差情况下需要比较红黑树的高度次,当树中的节点非常多时,查询效率也不理想。最好 的查询是,…...
4399大数据面试题及参考答案(数据分析和数据开发)
对数据分析的理解 数据分析是一个从数据中提取有价值信息以支持决策的过程。它涵盖了数据收集、清洗、转换、建模和可视化等多个环节。 首先,数据收集是基础。这包括从各种数据源获取数据,例如数据库、文件系统、网络接口等。这些数据源可以是结构化的数据,如关系型数据库中…...

快速理解倒排索引在ElasticSearch中的作用
一.基础概念 定义: 倒排索引是一种数据结构,用来加速文本数据的搜索和检索,和传统的索引方式不同,倒排索引会被每个词汇项与包含该词汇项的文档关联起来,从而去实现快速的全文检索。 举例: 在传统的全文…...

C++趣味编程玩转物联网:基于树莓派Pico控制无源蜂鸣器-实现音符与旋律的结合
无源蜂鸣器是一种多功能的声音输出设备,与有源蜂鸣器相比,它能够通过不同频率的方波生成丰富多样的音调。本项目使用树莓派Pico开发板,通过编程控制无源蜂鸣器播放经典旋律《归来有风》。本文将详细介绍项目实现中的硬件连接、C++代码解析,以及无源蜂鸣器的工作原理。 一、…...
《RuoYi基于SpringBoot+Vue前后端分离的Java快速开发框架学习》系列博客_Part4_三模态融合
系列博客目录 文章目录 系列博客目录目标Step1:之前工作形成子组件Step2:弥补缺失的文本子组件,同时举例如何子组件向父组件传数据Step3:后端代码需要根据上传的文件传给python服务器Step4:python服务器进行分析 目标 实现三模态融合,将文本、图片、音频…...

springboot365高校疫情防控web系统(论文+源码)_kaic
毕 业 设 计(论 文) 题目:高校疫情防控的设计与实现 摘 要 互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为…...

STM32 USART串口数据包
单片机学习! 目录 前言 一、数据包 二、HEX数据包 三、文本数据包 四、HEX数据包和文本数据包优缺点 4.1 HEX数据包 4.2 文本数据包 五、HEX数据包接收 六、文本数据包接收 总结 前言 本文介绍了串口数据包收发的思路和流程。 一、数据包 数据包的作用是把一个个单独…...
【LC】3232. 判断是否可以赢得数字游戏
题目描述: 给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中,Alice 可以从 nums 中选择所有个位数 或 所有两位数,剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和,则 Alice 获胜。如果…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)
一、OpenBCI_GUI 项目概述 (一)项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台,其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言,首次接触 OpenBCI 设备时,往…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...