当前位置: 首页 > news >正文

go动态创建/增加channel并处理数据

背景描述

有一个需求,大概可以描述为:有多个websocket连接,因此消息会并发地发送过来,这些消息中有一个标志可以表明是哪个连接发来的消息,但只有收到消息后才能建立channel或写入已有channel,在收消息前无法预先创建channel

解决过程(可直接阅读最终版)

初版:直接写入

因为对数据量错误预估(以为数据量不大),一开始我是用的mysql直接写入,每次收到ws消息立即处理,可测试中发现因数据量过多且都会操作同一行数据,出现了资源竞争,导致死锁。

第二版:增加锁

在发现出现数据竞争后,我第一反应是增加读写锁。读写锁的代码类似以下示例:

package mainimport ("database/sql""fmt""sync"_ "github.com/go-sql-driver/mysql"
)var (db *sql.DBmu sync.RWMutex
)func init() {var err errordb, err = sql.Open("mysql", "username:password@tcp(localhost:3306)/dbname")if err != nil {panic(err)}
}func main() {defer db.Close()// 读取数据go readData()// 写入数据go writeData()// 保持主线程运行select {}
}func readData() {for {mu.RLock()rows, err := db.Query("SELECT * FROM table_name")mu.RUnlock()if err != nil {fmt.Println("Error reading data:", err)continue}defer rows.Close()// 处理查询结果// ...// 睡眠一段时间,模拟读操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}func writeData() {for {mu.Lock()_, err := db.Exec("INSERT INTO table_name (column1, column2) VALUES (?, ?)", value1, value2)mu.Unlock()if err != nil {fmt.Println("Error writing data:", err)continue}// 睡眠一段时间,模拟写操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}

但是代码里对数据库的操作非常频繁且混乱,加了读写锁后经常出现请求很慢的情况,考虑其他方案

第三版 使用事务

使用事务代码忽略,最终发现,因为事务过长,导致出现了重复写的问题,考虑其他方案

第四版 map

通过一个二维的map来存储数据,每当数据存满10条就处理,当然毫不意外的,出现了map的竞争。map也是可以用锁的,但是这里是二维的map,加上两层锁之后使得效率极低,而且依旧有概率出现map竞争导致报错

此外,还可以考虑使用redis设置锁,直接set就行了,但是因为环境不支持redis,此方案弃用

最终版 动态channel

出现以上问题的根本原因是消费太快,其实完全可以把每个ws连接的数据都写到各自的channel里,同时设置每个channel都累积10条再消费,当然还需要一个处理机制,如果超过10s也消费一次。

启动"生产者"、“消费者”

在当前环境中,生产者就是每次从ws中读到数据往动态channel中写入,消费者就是不断获取有哪些channel,以及从channel中读数据,在ws写入时的处理逻辑大概可以简化为如下demo:

package testimport ("context""encoding/json""github.com/gin-gonic/gin""github.com/gorilla/websocket"log "github.com/sirupsen/logrus""net/http""sync"
)// RequestTemplate 请求模板
type RequestTemplate struct {Op   string               `json:"op"`   // 操作Id   int                  `json:"id"`   // 唯一id标识Time string               `json:"time"` // 时间,用秒级时间戳,字符串包裹Data *RequestTemplateData `json:"data"` // 请求数据Code int                  `json:"code"` // 状态码
}// RequestTemplateData 请求中data包含的部分,实际这里是很复杂的结构,之前超时/死锁也是因为这里处理逻辑比较复杂,但是这篇博客的演示重点不是这个,因此简略为id和请求ip
type RequestTemplateData struct {ConnIp string `json:"conn_ip"` // 请求ipId     int    `json:"id"`      // 唯一id标识
}// ConnInfo 具体的连接信息
type ConnInfo struct {Conn      *websocket.Conn    `json:"conn"`   // websocket连接Ctx       context.Context    `json:"ctx"`    // 连接上下文CtxCancel context.CancelFunc `json:"cancel"` // 连接上下文cancel functionIp        string             `json:"ip"`     // 连接的手机端ipId        int                `json:"id"`     // 唯一id标识
}var AllConns = make(map[string]*ConnInfo) //创建字典集合存储连接信息// Start 启动
func Start() {//处理ws的连接http.HandleFunc("/ws", HandleMsg)// //监听7001端口号,作为websocket连接的服务log.Info("Server started on :7001")log.Fatal(http.ListenAndServe(":7001", nil))
}// ChannelStorage channel数据
type ChannelStorage struct {sync.RWMutexchannels map[string]chan *RequestTemplateData
}var ConnRequestData map[int]*RequestTemplateDatavar upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true},
}// HandleMsg 处理ws连接,每来一个新客户端请求就建立一个新连接
func HandleMsg(w http.ResponseWriter, r *http.Request) {conn, err := upgrader.Upgrade(w, r, nil) // 协议升级,这里也可以直连if err != nil {log.Error(err)return}//获取连接ip,这里是为了区分每个连接connIp := conn.RemoteAddr().String()// 这里是为了后续关闭channelrootCtx := context.Background()ctx, cancel := context.WithCancel(rootCtx)//加入连接AllConns[connIp] = &ConnInfo{Conn:      conn,   // 客户端ws链接对象Ctx:       ctx,    // 连接上下文CtxCancel: cancel, // 取消连接上下文}defer func() {// 如果断开连接,删除数据if AllConns[connIp] != nil {AllConns[connIp].CtxCancel()delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 这里对结束做处理}delete(AllConns, conn.RemoteAddr().String())err = conn.Close()if err != nil {return}log.Error("HandleMsg异常,开始defer处理:", err)if err := recover(); err != nil {log.Error("websocket连接异常,已断开:", err)}}()log.WithFields(log.Fields{"connIp": connIp,}).Info("沙箱已连接")reqCh := &ChannelStorage{}go reqCh.ResultConsumer(ctx) // 这里是消费者//循环读取ws客户端的消息for {// 读取消息_, msg, err := conn.ReadMessage()if err != nil {log.WithFields(log.Fields{"connIp": connIp,}).WithError(err).Error("读取websocket的消息失败")if AllConns[connIp] != nil {delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 连接断开设置状态为结束}// 断开ws连接conn.Close()delete(AllConns, conn.RemoteAddr().String())return}//msg []byte转stringmsgStr := string(msg)log.Info("收到消息为:", msgStr)//反序列化消息为结构体requestData := RequestTemplate{}if err := json.Unmarshal(msg, &requestData); err != nil {conn.WriteJSON(gin.H{"id": "未知", "op": "未知", "error": "cmd通信的请求参数有误,无法json decode"})log.Error("json_decode cmd命令的请求参数时出错:", err)continue}dataInfo := requestData.Data// 这里实际上有很多操作,简写为两种if requestData.Op != "" {switch requestData.Op {// 收到报告case "report":go reqCh.Produce(dataInfo) // "生产者",发送一条消息// 已完成case "done":go CheckDone(dataInfo, conn) // 做完成的处理default:log.Error("未识别的命令:", msgStr)}}}
}

有一个for循环在持续监听ws消息,消费者只启动一次,这里重点就是生产和消费如何实现

“生产者”

“生产者”要做的事就是:
1 每当收到ws消息后,解析,拿到唯一id(这个唯一是指这个连接下的所有上报消息的id都是相同的)
2 判断这个“唯一id”是否已经创建了channel,若创建了则不需要创建,直接写入channel,若未创建则新建channel
以下是生产者的demo:

// GetChannel 获取通道
func (cs *ChannelStorage) GetChannel(key string) chan *RequestTemplateData {cs.RLock()defer cs.RUnlock()return cs.channels[key]
}// CreateChannel 创建通道并存储到 map 中
func (cs *ChannelStorage) CreateChannel(key string) chan *RequestTemplateData {cs.Lock()defer cs.Unlock()if cs.channels == nil {cs.channels = make(map[string]chan *RequestTemplateData, 800)}ch := make(chan *RequestTemplateData, 10)cs.channels[key] = chreturn ch
}// Produce 往上报channel中写数据
func (cs *ChannelStorage) Produce(requestData *RequestTemplateData) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultAdd error________: ", err)}}()// 创建存储通道的结构体实例chanelKey := strconv.Itoa(requestData.Id)channel := cs.GetChannel(chanelKey)if channel == nil {channel = cs.CreateChannel(chanelKey)}// 直接往channel里面塞if channel != nil {channel <- requestData}
}
消费者

消费者由于只启动一次,但后续可能会有新的channel,因此需要增加一个获取所有连接的方法:
消费者demo:

func (cs *ChannelStorage) ResultConsumer(ctx context.Context) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultConsumer error________: ", err)}}()for {select {case <-ctx.Done():log.Info("websocket断开连接,消费者协程退出...")returndefault:cs.processAllChannels(ctx)  // 传入 context.Contexttime.Sleep(2 * time.Second) // 控制处理频率}}
}// processAllChannels 获取所有channel
func (cs *ChannelStorage) processAllChannels(ctx context.Context) {cs.RLock()defer cs.RUnlock()var wg sync.WaitGroup // 用于等待所有通道处理完毕for chName, channel := range cs.channels {wg.Add(1)go func(chName string, channel chan *RequestTemplateData) {defer wg.Done()cs.processChannel(chName, channel, ctx)}(chName, channel)}wg.Wait() // 等待所有通道处理完毕
}
func (cs *ChannelStorage) processChannel(chName string, channel chan *RequestTemplateData, ctx context.Context) {const batchSize = 10 // 每次处理的数据量var messages []*RequestTemplateDatatargetMsgOverTime := 10 * time.Second // 超时时间for {select {case caseMsg := <-channel:messages = append(messages, caseMsg) // 将接收到的消息放入 messages 切片中if len(messages) == batchSize {tmpMessages := messagesmessages = nilprocessMessages(tmpMessages)}case <-time.After(targetMsgOverTime):log.Info("Timeout reached. Processing...")if len(messages) > 0 {tmpMessages := messagesmessages = nillog.Info("Processing remaining messages for channel:", chName)processMessages(tmpMessages)}case <-ctx.Done(): // 如果收到上下文取消信号,退出函数log.Info("______________________error__________cancel______")return}}
}func processMessages(messages []*RequestTemplateData) {// 在这里处理消息就是批量的了
}

相关文章:

go动态创建/增加channel并处理数据

背景描述 有一个需求&#xff0c;大概可以描述为&#xff1a;有多个websocket连接&#xff0c;因此消息会并发地发送过来&#xff0c;这些消息中有一个标志可以表明是哪个连接发来的消息&#xff0c;但只有收到消息后才能建立channel或写入已有channel&#xff0c;在收消息前无…...

asp.net成绩查询系统

说明文档 运行前附加数据库.mdf&#xff08;或sql生成数据库&#xff09; 主要技术&#xff1a; 基于asp.net架构和sql server数据库 功能模块&#xff1a; asp.net成绩查询系统 学生功能有查看成绩和修改账号密码等 后台管理员可以进行用户管理 管理员添加管理员查询注…...

Express路由

什么是路由 官方定义&#xff1a;路由确定了应用程序如何响应客户端对特定端点的请求。 路由的使用 一个路由的组成有 请求方法、路径 和 回调函数 组成。 Express中提供了一些列方法&#xff0c;可以很方便的使用路由&#xff0c;使用格式如下&#xff1a; app.<metho…...

在做题中学习(53): 寻找旋转数组中的最小值

153. 寻找旋转排序数组中的最小值 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a;O(logn)->很可能就是二分查找 思路&#xff1a;再看看题目要求&#xff0c;可以画出旋转之后数组中元素的大小关系&#xff1a; 首先&#xff0c;数组是具有二段性的(适配二分查…...

C#语言进阶(三) 元组

总目录 C# 语法总目录 元组目录 元组1. 元组元素命名2. 元组的解构3. 元组的比较 元组 元组(tuple)是一组存储值的便捷方式。 元组的目的主要是&#xff0c;不使用out参数而从方法中返回多个值。(匿名类型无法做这个操作)元组能做匿名类型所有操作。 元组是值类型&#xff0…...

实用的Chrome 浏览器命令

Google Chrome 浏览器提供了许多快捷命令和实用功能&#xff0c;可以帮助用户提高效率和改善浏览体验。这里列举了一些非常实用的Chrome浏览器命令&#xff1a; 1. **CtrlT** / **CmdT** - 打开一个新的标签页。 2. **CtrlShiftT** / **CmdShiftT** - 重新打开最后关闭的标签页…...

IDEA远程连接docker服务,windows版docker desktop

1.windows上安装docker desktop docker desktop下载地址&#xff1a;Docker Desktop: The #1 Containerization Tool for Developers | Docker 有的windows系统不支持安装docker desktop 安装完之后我们可以直接打开&#xff0c;可以选择不登录使用 我们用IDEA连接到docker …...

Rust 和 Go 哪个更好?

在讨论 Rust 与 Go 两种编程语言哪种更优秀时&#xff0c;我们将探讨它们在性能、简易性、安全性、功能、规模和并发处理等方面的比较。同时&#xff0c;我们看看它们有什么共同点和根本的差异。现在就来看看这个友好而公平的对比。 Rust 和 Go 都是优秀的选择 首先&#xff…...

【免费Java系列】大家好 ,今天是学习面向对象高级的第八天点赞收藏关注,持续更新作品 !

这是java进阶课面向对象第一天的课程可以坐传送去学习http://t.csdnimg.cn/Lq3io day08-Map集合、Stream流、File类 一、Map集合 同学们&#xff0c;在前面几节课我们已经学习了Map集合的常用方法&#xff0c;以及遍历方式。 下面我们要学习的是Map接口下面的是三个实现类H…...

RPC 失败。curl 16 Error in the HTTP2 framing layer

报错&#xff1a; (base) hh-virtual-machine:~/work$ git clone https://github.com/yangzongzhuan/RuoYi-Vue3.git 正克隆到 RuoYi-Vue3... error: RPC 失败。curl 16 Error in the HTTP2 framing layer fatal: 在引用列表之后应该有一个 flush 包这个错误通常是由于 Git 在…...

(图论)最短路问题合集(包含C,C++,Java,Python,Go)

不存在负权边&#xff1a; 1.朴素dijkstra算法 原题&#xff1a; 思路&#xff1a;&#xff08;依然是贪心的思想&#xff09; 1.初始化距离&#xff1a;dis[1]0&#xff0c;dis[i]INF&#xff08;正无穷&#xff09; 2.循环n次&#xff1a; 找到当前不在s中的dis最小的点&…...

电脑文件批量重命名不求人:快速操作,高效技巧让你轻松搞定

在数字化时代&#xff0c;电脑文件的管理与整理显得尤为重要。当面对大量需要重命名的文件时&#xff0c;一个个手动修改不仅耗时&#xff0c;还容易出错。那么&#xff0c;有没有一种方法可以快速、高效地完成这一任务呢&#xff1f;答案是肯定的&#xff0c;下面就来介绍几种…...

基于springboot的网上点餐系统源码数据库

基于springboot的网上点餐系统源码数据库 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于网上点餐系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了网上点餐系统…...

mysql cluster数据库集群介绍、部署及配置

前言: MySQL集群是一个无共享的、分布式节点架构的存储方案,旨在提供容错性和高性能。它由三个主要节点组成:管理节点(MGM)、数据节点和SQL节点。 管理节点(MGM) 定义与用途:管理节点是MySQL Cluster的控制中心,负责管理集群内的其他节点。它提供配置数据,启动和停止…...

uniapp的app端软件更新弹框

1&#xff1a;使用html PLUS实现&#xff1a;地址HTML5 API Reference (html5plus.org)&#xff0c;效果图 2&#xff1a;在app.vue的onLaunch生命周期中&#xff0c;代码如下&#xff1a; onLaunch: function() {let a 0let view new plus.nativeObj.View(maskView, {backg…...

win11 Terminal 部分窗口美化

需求及分析&#xff1a;因为在 cmd、anaconda prompt 窗口中输入命令较多&#xff0c;而命令输入行和输出结果都是同一个颜色&#xff0c;不易阅读&#xff0c;故将需求定性为「美化窗口」。 美化结束后&#xff0c;我在想是否能不安装任何软件&#xff0c;简单地通过调整主题颜…...

开源go实现的iot物联网新基建平台

软件介绍 Magistrala IoT平台是由Abstract Machines公司开发的创新基础设施解决方案&#xff0c;旨在帮助组织和开发者构建安全、可扩展和创新的物联网应用程序。曾经被称为Mainflux的平台&#xff0c;现在已经开源&#xff0c;并在国际物联网领域受到广泛关注。 功能描述 多协…...

24深圳杯ABCD成品论文47页+各小问代码+图表

A题多个火箭残骸的准确定位&#xff1a; A题已经更新完22页完整版论文&#xff0b;高清无水印照片&#xff0b;Python&#xff08;MATLAB&#xff09;代码简单麦麦https://www.jdmm.cc/file/2710544/ 问题1&#xff1a;单个残骸的音爆位置确定 建模思路&#xff1a; 1. 声波传…...

doris经典bug

在部署完登录web页面查看的时候会发现只有一个节点可以读取信息剩余的节点什么也没读取到 在发现问题后&#xff0c;我们去对应的节点去看log日志&#xff0c;发现它自己绑定到前端的地址上了 现在我们已经发现问题了&#xff0c;以下就开始解决问题 重置doris 首先对be进行操…...

贪心算法应用例题

最优装载问题 #include <stdio.h> #include <algorithm>//排序int main() {int data[] { 8,20,5,80,3,420,14,330,70 };//物体重量int max 500;//船容最大总重量int count sizeof(data) / sizeof(data[0]);//物体数量std::sort(data, data count);//排序,排完数…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...