当前位置: 首页 > news >正文

消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{var factory = new ConnectionFactory(){HostName = "localhost",Port = 5672,UserName = "test",Password = "123456",VirtualHost = "/",};return Task.FromResult(factory.CreateConnection());
});
try
{var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{channel.Return();
}#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{public static ChannelPool Default = new(8, 50);private int connectionCount;private int channelCountPerConnection;public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5){if (connectionCount > 0) this.connectionCount = connectionCount;if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;}public class ChannelItem{public int ConnectionIndex { get; set; }public HostItem CacheHost { get; set; }public IModel RawChannel { get; set; }public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);}public class HostItem{public SemaphoreSlim HostLocker { get; set; }public List<IConnection> Connections { get; set; }public int CurrentConnectionIndex { get; set; }public List<SemaphoreSlim> ConnectionLockers { get; set; }public List<EasyPool<ChannelItem>> ChannelPools { get; set; }}#region EasyPoolpublic sealed class EasyPool<T> : IDisposable where T : class{private readonly ConcurrentBag<T> _pool;private readonly Func<T> _factory;private readonly int _maxCount;public EasyPool(Func<T> factory, int maxCount){_factory = factory;_maxCount = maxCount;_pool = new ConcurrentBag<T>();}public T Get(){if (!_pool.TryTake(out var result)) return _factory();return result;}public bool Return(T item){if (_pool.Count >= _maxCount){if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }return false;}_pool.Add(item);return true;}public void Dispose(){T result;while (_pool.TryTake(out result)){if (result is IDisposable disposable){try { disposable.Dispose(); } catch { }}}}}#endregionprivate readonly Dictionary<string, HostItem> _cacheHosts = new();public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty){var connectionCount = this.connectionCount;var maxChannelCountPerConnection = this.channelCountPerConnection;//获取 HostItemif (!_cacheHosts.TryGetValue(key, out var cacheHost)){lock (_cacheHosts){if (!_cacheHosts.TryGetValue(key, out cacheHost)){cacheHost = new HostItem{HostLocker = new(1, 1),CurrentConnectionIndex = -1,Connections = new List<IConnection>(connectionCount),ConnectionLockers = new List<SemaphoreSlim>(connectionCount),ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),};for (int i = 0; i < connectionCount; i++){cacheHost.Connections.Add(null);cacheHost.ConnectionLockers.Add(new(1, 1));var idx = i;cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem{ConnectionIndex = idx,RawChannel = cacheHost.Connections[idx].CreateModel(),CacheHost = cacheHost}, maxChannelCountPerConnection));}_cacheHosts.Add(key, cacheHost);}}}//轮训得到连接索引await cacheHost.HostLocker.WaitAsync();int connectionIdx;try{connectionIdx = ++cacheHost.CurrentConnectionIndex;if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;}finally{try { cacheHost.HostLocker.Release(); } catch { }}//检查是否初始化链接var conn = cacheHost.Connections[connectionIdx];if (conn == null){var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];await connectionLocker.WaitAsync();try{conn = cacheHost.Connections[connectionIdx];if (conn == null){conn = await connectionFactoty();cacheHost.Connections[connectionIdx] = conn;}}finally{try { connectionLocker.Release(); } catch { }}}//得到 Channelreturn cacheHost.ChannelPools[connectionIdx].Get();}
}
#endregion

相关文章:

消息队列10:为RabbitMq添加连接池

环境&#xff1a; win11rabbitmq-3.8.17.net 6.0RabbitMQ.Client 6.8.1vs2022 安装RabbitMq环境参照&#xff1a; window下安装rabbitmqlinux下安装rabbitmq 问题&#xff1a;rabbitmq的c#客户端没有自带连接池&#xff0c;所以需要手动实现。 简易实现如下&#xff1a; usi…...

在使用 Docker 时,用户可能会遇到各种常见的错误和问题

在使用 Docker 时&#xff0c;用户可能会遇到各种常见的错误和问题。以下是一些需要注意的常见错误及其可能的解决方案&#xff1a; 1. 权限问题 在 Linux 系统上运行 Docker 命令时&#xff0c;可能会遇到权限不足的问题。解决这个问题通常有两种方法&#xff1a; 使用 sud…...

MinIO使用客户端进行桶和对象的管理

MinIO使用客户端进行桶和对象的管理 minio安装完成后&#xff0c;除了自带的webui管理界面&#xff0c;还可以使用官方配套的客户端mc进行管理。除此之外&#xff0c;还可以使用第三方客户端s3browser也可以完成对象和桶的生命周期管理。 1. 官方客户端mc MinIO客户端 mc 命…...

数据库管理-第244期 一次无法switchover的故障处理(20240928)

数据库管理244期 2024-09-28 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09;1 问题展现2 问题排查与处理2.1 问题12.2 问题2 3 问题分析4 总结 数据库管理-第244期 一次无法switchover的故障处理&#xff08;20240928&#xff09; 作者&…...

太绝了死磕这本大模型神书!

今天给大家推荐一本大模型神书&#xff0c;就是这本&#xff1a;《大语言模型&#xff1a;基础与前沿》 书籍介绍&#xff1a; 本书深入阐述了大语言模型的基本概念和算法、研究前沿以及应用&#xff0c;涵盖大语言模型的广泛主题&#xff0c;从基础到前沿&#xff0c;从方法…...

Kevin‘s notes about Qt---Episode 6 不同类中创建同一对象

问题描述 使用场景 现在在我的Qt界面中需要同时使用采集卡的AI(Analog Input)和AO(Analog Output)功能,均已分别调通,但是像之前一样通过创建两个类,然后分别在两个线程中进行操作的方式并不能实现。 原本写法 头文件 art_ao.h 核心代码如下: #ifndef ART_AO_H #defi…...

YOLOv9改进策略【Conv和Transformer】| AssemFormer 结合卷积与 Transformer 优势,弥补传统方法不足

一、本文介绍 本文记录的是利用AssemFormer优化YOLOv9的目标检测网络模型。传统卷积和池化操作会导致信息丢失和压缩缺陷,且传统的注意力机制通常产生固定维度的注意力图,忽略了背景中的丰富上下文信息。本文的利用AssemFormer改进YOLOv9,以在特征传递和融合过程中增加多尺…...

Git 的安装和配置

Git 是跨平台的&#xff0c;可以在 Windows&#xff0c;Linux、Unix 和 Mac 各几大平台上使用 由于笔者主要是使用 Windows&#xff0c;其他平台下安装 Git 的方法暂且不表&#xff08;可参考廖雪峰老师的博客&#xff1a;安装 Git&#xff09; ‍ Windows 安装 Git 从 Git…...

InternVL 微调实践

任务 follow 教学文档和视频使用QLoRA进行微调模型&#xff0c;复现微调效果&#xff0c;并能成功讲出梗图. 复现过程 参考教程部署&#xff1a;https://github.com/InternLM/Tutorial/blob/camp3/docs/L2/InternVL/joke_readme.md 训练 合并权重&&模型转换 pyth…...

自然语言处理在人工智能领域的发展历程,以及NLP重点模型介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下自然语言处理在人工智能领域的发展历程&#xff0c;以及NLP重点模型介绍。本文详细介绍了自然语言处理的发展历程&#xff0c;同时深入探讨了各种自然语言处理模型的原理与应用。文章首先回顾了自然语言处理技术的发…...

Replit Agent:AI驱动的全自动化软件开发革命

目录 引言Replit Agent核心功能使用场景与优势最新版本更新处理复杂项目的能力常见问题解决方案支持的编程语言和技术栈与其他AI编程工具的比较结语 引言 在人工智能快速发展的今天&#xff0c;软件开发领域正经历着前所未有的变革。Replit Agent作为AI初创公司Replit推出的…...

SAP调用发起泛微OA流程

SAP调用泛微Servlet接口&#xff0c;发起流程 编写servlet接口&#xff0c;给SAP调用 public class SAPCreateWorkflow extends HttpServlet{private static final long serialVersionUID 1L;public void doPost(HttpServletRequest request, HttpServletResponse response)…...

JAVA毕业设计184—基于Java+Springboot+vue3的企业信用信息管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue3的企业信用信息管理系统(源代码数据库)184 一、系统介绍 本项目前后端分离(可以改为ssm版本)&#xff0c;分为用户、管理员两种角色 1、用户&#xff1a; …...

webshell-HTTP常见特征

一、总体特点 二、蚁剑 数据中可以看到一些明文字符串函数&#xff0c;响应中可以看到响应的明文数据。 ant特征以及对数据base64可以解码 chr类别的会出现大量的chr编码 大量的百分号字符 三、哥斯拉 第一个请求包很大 响应为0 密钥被拆分到数据前后 响应包cookie带&#xf…...

docker简单熟悉

‌Docker 容器和‌虚拟机区别‌ Docker容器与虚拟机的主要区别在于虚拟化层次和资源占用&#xff1a; ‌虚拟化层次‌&#xff1a;Docker容器在操作系统级别进行虚拟化&#xff0c;共享宿主机的内核&#xff1b;而虚拟机在硬件级别进行虚拟化&#xff0c;每个虚拟机都拥有独立…...

《深海迷航》风灵月影修改器进阶教程:揭秘海底无限奥秘

潜入《深海迷航》那神秘莫测的海底世界&#xff0c;风灵月影修改器将成为你探索未知的得力助手。 遵循以下步骤&#xff0c;解锁无尽资源与生存优势&#xff1a; 1.安装与启动&#xff1a; 确保从安全源下载风灵月影修改器并安装完毕。启动游戏后&#xff0c;随即开启修改器&…...

为什么说函数传递参数最好小于四个

有一个建议说时函数传递参数最好不超过四个&#xff0c;原因有一个是参数太多难以维护&#xff0c;另一个重要的原因就是函数传递小于四个参数时候效率会更高&#xff0c;其实这个说法也不全对&#xff0c;在不同的结构下不太一样&#xff0c;也不一定是4 其实那么下面将探究函…...

三维立体自然资源“一张图”

随着信息技术的发展&#xff0c;自然资源管理迎来了新的机遇与挑战。在众多技术中&#xff0c;“三维立体自然资源‘一张图’”的概念尤为引人注目。它不仅代表了地理信息科学领域的最新成果&#xff0c;也为自然资源的有效管理和可持续利用提供了强有力的支持。本文将探讨这一…...

语言的重定向

输入输出重定向是相当有意思的一门技术&#xff0c;比如有的人每个月的收入自动转10%到支付宝&#xff0c;20%进了老婆的账户。这么有效益的事情&#xff0c;基本所有的操作系统都支持&#xff0c;本质上它不是编程语言特性&#xff0c;编程语言只是为了更方便调用操作系统的重…...

Snap 发布新一代 AR 眼镜,有什么特别之处?

Snap 发布新一代 AR 眼镜&#xff0c;有什么特别之处&#xff1f; Snap 简介 新一代的 AR 眼镜特点 Snap 简介 Snap 公司成立于 2010 年&#xff0c;2017 年美国东部时间 3 月 2 日上午 11 时许&#xff0c;在纽交所正式挂牌交易&#xff0c;股票代码为 “SNAP”。其旗下的核…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...