当前位置: 首页 > news >正文

消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{var factory = new ConnectionFactory(){HostName = "localhost",Port = 5672,UserName = "test",Password = "123456",VirtualHost = "/",};return Task.FromResult(factory.CreateConnection());
});
try
{var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{channel.Return();
}#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{public static ChannelPool Default = new(8, 50);private int connectionCount;private int channelCountPerConnection;public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5){if (connectionCount > 0) this.connectionCount = connectionCount;if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;}public class ChannelItem{public int ConnectionIndex { get; set; }public HostItem CacheHost { get; set; }public IModel RawChannel { get; set; }public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);}public class HostItem{public SemaphoreSlim HostLocker { get; set; }public List<IConnection> Connections { get; set; }public int CurrentConnectionIndex { get; set; }public List<SemaphoreSlim> ConnectionLockers { get; set; }public List<EasyPool<ChannelItem>> ChannelPools { get; set; }}#region EasyPoolpublic sealed class EasyPool<T> : IDisposable where T : class{private readonly ConcurrentBag<T> _pool;private readonly Func<T> _factory;private readonly int _maxCount;public EasyPool(Func<T> factory, int maxCount){_factory = factory;_maxCount = maxCount;_pool = new ConcurrentBag<T>();}public T Get(){if (!_pool.TryTake(out var result)) return _factory();return result;}public bool Return(T item){if (_pool.Count >= _maxCount){if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }return false;}_pool.Add(item);return true;}public void Dispose(){T result;while (_pool.TryTake(out result)){if (result is IDisposable disposable){try { disposable.Dispose(); } catch { }}}}}#endregionprivate readonly Dictionary<string, HostItem> _cacheHosts = new();public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty){var connectionCount = this.connectionCount;var maxChannelCountPerConnection = this.channelCountPerConnection;//获取 HostItemif (!_cacheHosts.TryGetValue(key, out var cacheHost)){lock (_cacheHosts){if (!_cacheHosts.TryGetValue(key, out cacheHost)){cacheHost = new HostItem{HostLocker = new(1, 1),CurrentConnectionIndex = -1,Connections = new List<IConnection>(connectionCount),ConnectionLockers = new List<SemaphoreSlim>(connectionCount),ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),};for (int i = 0; i < connectionCount; i++){cacheHost.Connections.Add(null);cacheHost.ConnectionLockers.Add(new(1, 1));var idx = i;cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem{ConnectionIndex = idx,RawChannel = cacheHost.Connections[idx].CreateModel(),CacheHost = cacheHost}, maxChannelCountPerConnection));}_cacheHosts.Add(key, cacheHost);}}}//轮训得到连接索引await cacheHost.HostLocker.WaitAsync();int connectionIdx;try{connectionIdx = ++cacheHost.CurrentConnectionIndex;if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;}finally{try { cacheHost.HostLocker.Release(); } catch { }}//检查是否初始化链接var conn = cacheHost.Connections[connectionIdx];if (conn == null){var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];await connectionLocker.WaitAsync();try{conn = cacheHost.Connections[connectionIdx];if (conn == null){conn = await connectionFactoty();cacheHost.Connections[connectionIdx] = conn;}}finally{try { connectionLocker.Release(); } catch { }}}//得到 Channelreturn cacheHost.ChannelPools[connectionIdx].Get();}
}
#endregion

相关文章:

消息队列10:为RabbitMq添加连接池

环境&#xff1a; win11rabbitmq-3.8.17.net 6.0RabbitMQ.Client 6.8.1vs2022 安装RabbitMq环境参照&#xff1a; window下安装rabbitmqlinux下安装rabbitmq 问题&#xff1a;rabbitmq的c#客户端没有自带连接池&#xff0c;所以需要手动实现。 简易实现如下&#xff1a; usi…...

在使用 Docker 时,用户可能会遇到各种常见的错误和问题

在使用 Docker 时&#xff0c;用户可能会遇到各种常见的错误和问题。以下是一些需要注意的常见错误及其可能的解决方案&#xff1a; 1. 权限问题 在 Linux 系统上运行 Docker 命令时&#xff0c;可能会遇到权限不足的问题。解决这个问题通常有两种方法&#xff1a; 使用 sud…...

MinIO使用客户端进行桶和对象的管理

MinIO使用客户端进行桶和对象的管理 minio安装完成后&#xff0c;除了自带的webui管理界面&#xff0c;还可以使用官方配套的客户端mc进行管理。除此之外&#xff0c;还可以使用第三方客户端s3browser也可以完成对象和桶的生命周期管理。 1. 官方客户端mc MinIO客户端 mc 命…...

数据库管理-第244期 一次无法switchover的故障处理(20240928)

数据库管理244期 2024-09-28 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09;1 问题展现2 问题排查与处理2.1 问题12.2 问题2 3 问题分析4 总结 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09; 作者&…...

太绝了死磕这本大模型神书!

今天给大家推荐一本大模型神书&#xff0c;就是这本&#xff1a;《大语言模型&#xff1a;基础与前沿》 书籍介绍&#xff1a; 本书深入阐述了大语言模型的基本概念和算法、研究前沿以及应用&#xff0c;涵盖大语言模型的广泛主题&#xff0c;从基础到前沿&#xff0c;从方法…...

Kevin‘s notes about Qt---Episode 6 不同类中创建同一对象

问题描述 使用场景 现在在我的Qt界面中需要同时使用采集卡的AI(Analog Input)和AO(Analog Output)功能,均已分别调通,但是像之前一样通过创建两个类,然后分别在两个线程中进行操作的方式并不能实现。 原本写法 头文件 art_ao.h 核心代码如下: #ifndef ART_AO_H #defi…...

YOLOv9改进策略【Conv和Transformer】| AssemFormer 结合卷积与 Transformer 优势,弥补传统方法不足

一、本文介绍 本文记录的是利用AssemFormer优化YOLOv9的目标检测网络模型。传统卷积和池化操作会导致信息丢失和压缩缺陷,且传统的注意力机制通常产生固定维度的注意力图,忽略了背景中的丰富上下文信息。本文的利用AssemFormer改进YOLOv9,以在特征传递和融合过程中增加多尺…...

Git 的安装和配置

Git 是跨平台的&#xff0c;可以在 Windows&#xff0c;Linux、Unix 和 Mac 各几大平台上使用 由于笔者主要是使用 Windows&#xff0c;其他平台下安装 Git 的方法暂且不表&#xff08;可参考廖雪峰老师的博客&#xff1a;安装 Git&#xff09; ‍ Windows 安装 Git 从 Git…...

InternVL 微调实践

任务 follow 教学文档和视频使用QLoRA进行微调模型&#xff0c;复现微调效果&#xff0c;并能成功讲出梗图. 复现过程 参考教程部署&#xff1a;https://github.com/InternLM/Tutorial/blob/camp3/docs/L2/InternVL/joke_readme.md 训练 合并权重&&模型转换 pyth…...

自然语言处理在人工智能领域的发展历程,以及NLP重点模型介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理在人工智能领域的发展历程&#xff0c;以及NLP重点模型介绍。本文详细介绍了自然语言处理的发展历程&#xff0c;同时深入探讨了各种自然语言处理模型的原理与应用。文章首先回顾了自然语言处理技术的发…...

Replit Agent:AI驱动的全自动化软件开发革命

目录 引言Replit Agent核心功能使用场景与优势最新版本更新处理复杂项目的能力常见问题解决方案支持的编程语言和技术栈与其他AI编程工具的比较结语 引言 在人工智能快速发展的今天&#xff0c;软件开发领域正经历着前所未有的变革。Replit Agent作为AI初创公司Replit推出的…...

SAP调用发起泛微OA流程

SAP调用泛微Servlet接口&#xff0c;发起流程 编写servlet接口&#xff0c;给SAP调用 public class SAPCreateWorkflow extends HttpServlet{private static final long serialVersionUID 1L;public void doPost(HttpServletRequest request, HttpServletResponse response)…...

JAVA毕业设计184—基于Java+Springboot+vue3的企业信用信息管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue3的企业信用信息管理系统(源代码数据库)184 一、系统介绍 本项目前后端分离(可以改为ssm版本)&#xff0c;分为用户、管理员两种角色 1、用户&#xff1a; …...

webshell-HTTP常见特征

一、总体特点 二、蚁剑 数据中可以看到一些明文字符串函数&#xff0c;响应中可以看到响应的明文数据。 ant特征以及对数据base64可以解码 chr类别的会出现大量的chr编码 大量的百分号字符 三、哥斯拉 第一个请求包很大 响应为0 密钥被拆分到数据前后 响应包cookie带&#xf…...

docker简单熟悉

‌Docker 容器和‌虚拟机区别‌ Docker容器与虚拟机的主要区别在于虚拟化层次和资源占用&#xff1a; ‌虚拟化层次‌&#xff1a;Docker容器在操作系统级别进行虚拟化&#xff0c;共享宿主机的内核&#xff1b;而虚拟机在硬件级别进行虚拟化&#xff0c;每个虚拟机都拥有独立…...

《深海迷航》风灵月影修改器进阶教程:揭秘海底无限奥秘

潜入《深海迷航》那神秘莫测的海底世界&#xff0c;风灵月影修改器将成为你探索未知的得力助手。 遵循以下步骤&#xff0c;解锁无尽资源与生存优势&#xff1a; 1.安装与启动&#xff1a; 确保从安全源下载风灵月影修改器并安装完毕。启动游戏后&#xff0c;随即开启修改器&…...

为什么说函数传递参数最好小于四个

有一个建议说时函数传递参数最好不超过四个&#xff0c;原因有一个是参数太多难以维护&#xff0c;另一个重要的原因就是函数传递小于四个参数时候效率会更高&#xff0c;其实这个说法也不全对&#xff0c;在不同的结构下不太一样&#xff0c;也不一定是4 其实那么下面将探究函…...

三维立体自然资源“一张图”

随着信息技术的发展&#xff0c;自然资源管理迎来了新的机遇与挑战。在众多技术中&#xff0c;“三维立体自然资源‘一张图’”的概念尤为引人注目。它不仅代表了地理信息科学领域的最新成果&#xff0c;也为自然资源的有效管理和可持续利用提供了强有力的支持。本文将探讨这一…...

语言的重定向

输入输出重定向是相当有意思的一门技术&#xff0c;比如有的人每个月的收入自动转10%到支付宝&#xff0c;20%进了老婆的账户。这么有效益的事情&#xff0c;基本所有的操作系统都支持&#xff0c;本质上它不是编程语言特性&#xff0c;编程语言只是为了更方便调用操作系统的重…...

Snap 发布新一代 AR 眼镜,有什么特别之处?

Snap 发布新一代 AR 眼镜&#xff0c;有什么特别之处&#xff1f; Snap 简介 新一代的 AR 眼镜特点 Snap 简介 Snap 公司成立于 2010 年&#xff0c;2017 年美国东部时间 3 月 2 日上午 11 时许&#xff0c;在纽交所正式挂牌交易&#xff0c;股票代码为 “SNAP”。其旗下的核…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)

目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 &#xff08;1&#xff09;输入单引号 &#xff08;2&#xff09;万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...

在Spring Boot中集成RabbitMQ的完整指南

前言 在现代微服务架构中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个流行的消息中间件&#xff0c;支持多种消息协议&#xff0c;具有高可靠性和可扩展性。 本博客将详细介绍如何在 Spring Boot 项目…...