golang rabbitMQ 生产者复用channel以及生产者组分发策略
引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go
在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。
预期效果:
项目初始化构建时可以自定义选择生产者开启多个connection,每个connection可以启动多少个channel【都是全局复用的】,因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的,所以这个channel很重要,也是我们想要复用的重点。
初始化创建完connection和channel后,当生产者需要发送一条消息的时候,我们可以通过一些策略去选择它发送到哪个connection和channel,我这里采用的就是随机选择,也可以采用哈希取模、轮询权重算法等,这个可以根据自身业务来做。
我简单画了一个效果图:

定义RabbitMQ结构体以及Config结构体
type Config struct {Host stringPort intUser stringPassword string
}type RabbitMQ struct {ctx context.Contextn intm *sync.MutexConn *amqp.ConnectionChannel []*amqp.Channel
}实例化RabbitMQ结构体
func (mq *RabbitMQ) New(config Config) (rabbitmq *RabbitMQ) {configString := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.User, config.Password, config.Host, config.Port)conn, err := amqp.Dial(configString)if err != nil {log.Panicf("amqp connect error: %v \n", err)}rabbitmq = &RabbitMQ{ctx: context.Background(),m: &sync.Mutex{},Conn: conn,}return
}一、创建消费者
// ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel]
func (mq *RabbitMQ) ConsumeWithWork(queueName string, channelNums int) {for i := 0; i < channelNums; i++ {go func(i int) {ch, err := mq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}q, err := ch.QueueDeclare(queueName, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)if err != nil {log.Panicf("amqp set QoS error: %v \n", err)}msg, err := ch.Consume(q.Name, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {log.Panicf("amqp register a consumer error: %v \n", err)}log.Printf(" [work-%d] Waiting for messages. To exit press CTRL+C", i)for d := range msg {time.Sleep(2 * time.Second)fmt.Printf("[work-%d] Received a message: %s \n", i, d.Body)err = d.Ack(false)if err != nil {log.Printf("work_one Ack Err: %v", err)}}}(i)}var forever chan struct{}<-forever
}二、创建生产者组
// NewPlusherGroups 创建生产者组
func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[int]*RabbitMQ) {plusherGroups = make(map[int]*RabbitMQ, connNums)for i := 0; i < connNums; i++ {var rabbitmq *RabbitMQrabbitmq = rabbitmq.New(config)rabbitmq.n = ifor cN := 0; cN < channelNums; cN++ {ch, err := rabbitmq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}rabbitmq.Channel = append(rabbitmq.Channel, ch)}plusherGroups[i] = rabbitmq}return
}三、将消息随机分发给不同的connection、channel
// SendMessageWithWork 生产者发送消息[work模式+(many conn and many channel)]
func SendMessageWithWork(plusherGroups map[int]*RabbitMQ, queueName, body string) bool {if plusherGroups == nil {log.Panicln("SendMessageWithWork plusherGroups params is nil!")}rand.Seed(time.Now().UnixNano())//获取连接个数connNums := len(plusherGroups)//随机分配一个连接对象randConnIndex := rand.Intn(connNums)//选择随机分配的连接对象conn := plusherGroups[randConnIndex]//既然采用了发布者复用conn、channel的形式那么一定要加锁处理//这里为每个对象的操作进行加锁(非线程安全,不加锁会报错的)//至于在存在并发竞争的情况下会存在一定性能损耗,但是我们配置好适量的conn和channel这个基本可以忽略conn.m.Lock()defer conn.m.Unlock()//获取当前对象的channel个数channelNums := len(conn.Channel)//随机分配一个channel对象randChannelIndex := rand.Intn(channelNums)//选择随机分配的channelch := conn.Channel[randChannelIndex]q, err := ch.QueueDeclare(queueName, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}body = fmt.Sprintf("conn[%d] channel[%d] send message : %s", randConnIndex, randChannelIndex, body)err = ch.PublishWithContext(conn.ctx,"", // exchangeq.Name, // routing keyfalse, // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),})if err != nil {log.Panicf("amqp publish a message error: %v \n", err)}return true
}四、main函数调用消费者
package mainimport (rabbitmq "go-test/rabbitmq/package"
)func main() {queueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}var mq *rabbitmq.RabbitMQmq = mq.New(config)//开启N个消费者mq.ConsumeWithWork(queueName, 3)
}
五、main函数调用生产者组发送消息
package mainimport ("fmt""github.com/gin-gonic/gin"rabbitmq "go-test/rabbitmq/package""net/http""time"
)func main() {var messageNo intqueueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}//conn连接数connNums := 2//channel连接数channelNums := 3//启动N个不同conn的连接,并且每个连接对应的channel为N个的rabbitmq实例plusherGroup := rabbitmq.NewPlusherGroups(config, connNums, channelNums)e := gin.Default()e.GET("/", func(c *gin.Context) {body := fmt.Sprintf("这是第%d条消息...", messageNo)if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) == true {messageNo++c.JSON(200, gin.H{"code": 200,"msg": "success",})} else {c.JSON(200, gin.H{"code": 500,"msg": "error",})}})server := &http.Server{Addr: ":18776",Handler: e,ReadTimeout: time.Minute,WriteTimeout: time.Minute,}if err := server.ListenAndServe(); err != nil {panic(any("HttpServer启动失败"))}
}执行流程:
启动消费者进程
可以看到我们用3个协程开启了3个work,也就是对应了3个channel



启动生产者组进程
这里用的gin框架,正常启动

我们可以看到rabbitMQ的控制台中,一共3个连接,1个是消费者进程,另外2个是生产者组进程,这2个正好和我们上面配置的connNums参数匹配

我们可以看到rabbitMQ的控制台中,一共9个channel,3个是消费者进程,另外6个是生产者组进程,这6个正好和我们上面配置的channelNums参数匹配

调用发送消息
ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/我们来看消费者日志打印情况,标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel


相关文章:
golang rabbitMQ 生产者复用channel以及生产者组分发策略
引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。预期效果:项目初始化…...
掌握了这项技能的性能测试师,90%都升职加薪了
初入职场的新人该怎么做才能让自己快速成长?在公司一直做着手工测试,如何才能提升自己,避免陷入“只涨年龄不涨经验”的尴尬?做为一名软件测试工程师,我们不得不去面对这些问题,有的人找到了答案࿰…...
linux中crontab定时任务导致磁盘满和云监控未报警的的坑
一个后台开发者,兼职运维工作中,配置linux中crontab定时任务,导致磁盘满和云监控未报警的问题的坑。 1.磁盘满 使用命令 df -h2.问题排查 2.1排查日志 命令 cat /var/log/messages日志文件的默认路径是:/var/log 下面是几个…...
vscode中安装python运行调试环境
在运行代码之前,需要到微软商店下载安装python环境,35m,都是自动的。 1、安装python 的extensions插件。 ctrlshiftx 输入 python 后点击 install 按钮。 2、新建文件夹spider文件夹。 3、在新建文件夹spider下新建文件spider.py源代码。…...
【微服务】微服务架构超强讲解,通俗易懂
微服务架构目录一、微服务架构介绍二、出现和发展三、传统开发模式和微服务的区别四、微服务的具体特征五、面向服务的架构SOA(service oriented architecture)和微服务的区别1、SOA喜欢重用,微服务喜欢重写2、SOA喜欢水平服务,微…...
内核中的竞态产生的原因和解决方法
产生原因: 由于多进程对临界资源的抢占 根本原因: 1、对于单核处理器而言,内核支持抢占就会出现竞态 2、对于多核处理器而言,是核与核的竞态 3、进程与中断间存在竞态 4、arm开发板不会出现中断与中断间的竞态(目前&am…...
【微服务】Elasticsearch文档索引库操作(二)
🚗Es学习第二站~ 🚩Es学习起始站:【微服务】Elasticsearch概述&环境搭建(一) 🚩本文已收录至专栏:微服务探索之旅 👍希望您能有所收获 一.索引库操作 索引库就类似数据库表,mapping映射就类…...
【论文速递】NAACL2022-DEGREE: 一种基于生成的数据高效事件抽取模型
【论文速递】NAACL2022-DEGREE: 一种基于生成的数据高效事件抽取模型 【论文原文】:DEGREE A Data-Efficient Generation-Based Event Extraction Mode 【作者信息】:I-Hung Hsu , Kuan-Hao Huang, Elizabeth Boschee ÿ…...
C++类和对象(下)
✨个人主页: Yohifo 🎉所属专栏: C修行之路 🎊每篇一句: 图片来源 I do not believe in taking the right decision. I take a decision and make it right. 我不相信什么正确的决定。我都是先做决定,然后把…...
Java常见的六种线程池、线程池-四种拒绝策略总结
点个关注,必回关 一、线程池的四种拒绝策略: CallerRunsPolicy - 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。 一般并发比较小,性能要求不高,不允许失败。 但是,由于…...
Node=>Express中间件分类 学习4
1.中间件分类 应用级别的中间件路由级别的中间件错误级别的中间件Express 内置的中间件第三方的中间件 通过app.use()或app.get()或app.post()绑定到app实力上的中间件,叫做应用级别的中间件 …...
在阿里当外包,是一种什么工作体验?
上周和在阿里做外包的朋友一起吃饭,朋友吃着吃着,就开启了吐槽模式。 他一边喝酒一边说,自己现在做着这份工作,实在看不到前途。 看他状态不佳,问了才知道,是手上的项目太磨人。 他们现在做的项目&#…...
Vue3快速入门【二】
Vue3快速入门一、传值父传子,子传父v-model二、插槽2.1、匿名插槽2.2、具名插槽2.3、插槽作用域2.4、插槽作用域案例2.4.1、初始布局2.4.2、插槽使用2.4.3、点击编辑按钮获取本行数据(插槽作用域的使用)2.4.4、类型书写优化2.4.5、全局接口抽…...
C++-类和对象(上)
类和对象(上)一,构造函数1,概念2,特性二,析构函数1,概念2,特性三,拷贝构造1,概念2,特性四,运算符重载1,概念2,…...
CAPL(vTESTStudio) - DoIP - TCP接收_04
TCP接收 函数介绍 TcpOpen函数...
联合培养博士经历对于国内就业有优势吗?
2023年国家留学基金委(CSC)申请在即,很多在读博士在关心申报的同时,也对联培经历能否有助于国内就业心中存疑,故此知识人网小编重点解答此问题。之前,我们在“CSC联合培养-国内在读博士出国的绝佳选择”一文…...
测试左移之需求质量
测试左移的由来 缺陷的修复成本逐步升高 下面是质量领域司空见惯的一张图,看图说话,容易得出:大部分缺陷都是早期引入的,同时大部分缺陷都是中晚期发现的,而缺陷发现的越晚,其修复成本就越高。因此&#…...
【数据结构初阶】第三节.顺序表详讲
文章目录 前言 一、顺序表的概念 二、顺序表功能接口概览 三、顺序表基本功能的实现 四、四大功能 1、增加数据 1.1 头插法: 1.2 尾插法 1.3 指定下标插入 2、删除数据 2.1 头删 2.2 尾删 2.3 指定下标删除 2.4 删除首次出现的指定元素 3、查找数据…...
新手小白适合做跨境电商吗?
今天的跨境电商已经逐渐成熟,靠运气赚钱的时代早已过去,馅饼不可能从天上掉下来,尤其是你想做一个没有货源的小白劝你醒醒。做跨境电商真的不容易,要想做,首先要分析自己是否适合做。米贸搜整理了以下资料,…...
Python搭建自己[IP代理池]
IP代理是什么:ip就是访问网页数据服务器位置信息,每一个主机或者网络都有一个自己IP信息为什么要使用代理ip:因为在向互联网发送请求中,网页端会识别客户端是真实用户还是爬虫程序,在今天以互联网为主导的世界中&#…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
