当前位置: 首页 > news >正文

C#+redis实现消息队列的发布订阅功能

代码

参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs

using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}

新建SubscribeMethod.cs

using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}

新建DotnetQueueCore.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}

测试

SubscribeClassTest.cs

using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}

测试方法

private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}

参考

https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html

相关文章:

C#+redis实现消息队列的发布订阅功能

代码 参考c#redis stream实现消息队列以及ack机制文章的思路&#xff0c;实现 SubscribeAttribute.cs using System;namespace DotnetQueue.Attributes {/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited false)]pu…...

Docker容器基本操作

容器的基本操作 操作命令&#xff08;全&#xff09;命令&#xff08;简&#xff09;容器的创建docker container run <image name>docker run <image name>容器的列出&#xff08;up&#xff09;docker container lsdocker ps容器的列出&#xff08;up和exit&…...

从无序到有序:上北智信通过深度数据分析改善会议室资源配置

当前企业普遍面临会议室资源管理难题&#xff0c;预约机制不完善和临时会议多导致资源调度不合理&#xff0c;既有空置又有过度拥挤现象。 针对上述问题&#xff0c;上北智信采用了专业数据分析手段&#xff0c;巧妙融合楼层平面图、环形图、折线图和柱形图等多种可视化工具&a…...

总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证

总结&#xff1a;使用JDK原生HttpsURLConnection&#xff0c;封装HttpsUtil工具类&#xff0c;加载自定义证书验证&#xff0c;忽略ssl证书验证 一HttpsUtil工具类二SSLUtil工具类 一HttpsUtil工具类 package com.example.util;import javax.net.ssl.HttpsURLConnection; impo…...

重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙

近年来,AI Native应用的兴起已逐渐成为大众关注的焦点。在此背景下,Soul App的首席技术官陶明在极客公园IF2025创新大会上,发表了一场主题为“人机关系的新边界,Soul如何定义AI社交未来”的演讲。他分享了Soul在人工智能领域内的最新技术进展和战略规划,同时也将Soul社交元宇宙…...

HTTP 参数污染(HPP)详解

1. 什么是 HTTP 参数污染&#xff08;HPP&#xff09;&#xff1f; HTTP 参数污染&#xff08;HTTP Parameter Pollution&#xff0c;简称 HPP&#xff09;是一种 Web 应用攻击技术&#xff0c;攻击者通过在 HTTP 请求中注入多个相同的参数来绕过安全控制或篡改应用逻辑&#…...

阿里云轻量服务器docker部署nginx

拉取nginx docker镜像 sudo docker pull nginx创建以下挂载目录及文件 用户目录下&#xff1a;conf html logs conf: conf.d nginx.conf html: index.html conf.d: default.confnginx.conf添加文件内容 events {worker_connections 1024; }http {include /etc/ngi…...

(萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?

概念 笔者下面需要介绍的是库寄存器和HAL库两个重要的概念&#xff0c;在各位看完之后&#xff0c;需要决定自己的学习路线到底是学习HAL呢&#xff1f;还是寄存器呢&#xff1f;还是两者都学习呢&#xff1f; 库寄存器 库寄存器就是简单的封装了我们对寄存器的操作&#xf…...

Windchill开发-电子仓相关对象信息查询SQL

电子仓相关对象信息查询SQL 一、说明二、数据表信息三、数据表字段说明3.1 HOLDERTOCONTENT3.1.1 对象类型3.1.2 存储类型 3.2 APPLICATIONDATA3.2.1 类别3.2.2 与对象的角色关系3.2.3 存储方式3.2.4 其他字段 3.3 URLDATA3.4 STREAMDATA3.5 FVITEM3.6 FVMOUNT3.6.1 安装状态3.…...

MySQL 数据库定时任务及进阶学习

一、引言 在当今数字化时代&#xff0c;数据管理的高效性和自动化至关重要。MySQL 作为一款广泛应用的开源关系型数据库管理系统&#xff0c;提供了强大的功能来满足各种数据处理需求。其中&#xff0c;定时任务执行功能对于自动化数据操作、维护数据完整性以及优化系统性能具…...

DeepSeek教unity------MessagePack-01

中文&#xff1a;GitCode - 全球开发者的开源社区,开源代码托管平台 MessagePack是C# 的极速 MessagePack 序列化器。它比 MsgPack-Cli 快 10 倍&#xff0c;并且性能超过其他 C# 序列化器。MessagePack for C# 还内置支持 LZ4 压缩——一种极其快速的压缩算法。性能在诸如游戏…...

知识拓展:Python序列化模块 marshal 模块详解

Python marshal 模块学习笔记 1. 简介 marshal 是 Python 的内部序列化格式&#xff0c;主要用于序列化和反序列化 Python 对象。它是 Python 字节码&#xff08;.pyc文件&#xff09;使用的序列化格式&#xff0c;比 pickle 更原始和受限&#xff0c;但也更快速和安全。 http…...

leetcode 2684. 矩阵中移动的最大次数

题目如下 数据范围 本题使用常规动态规划就行&#xff0c;不过要注意由于有三个转移的方向&#xff0c;所以我们对dp数组的遍历应该是从上到下 从左到右即按列优先遍历。通过代码 class Solution { public:int maxMoves(vector<vector<int>>& grid) {int …...

机械学习基础-6.更多分类-数据建模与机械智能课程自留

data modeling and machine intelligence - FURTHER CLASSIFICATION 混淆矩阵评估指标&#xff1a;灵敏度和特异度ROC 曲线文字说明部分 AUC&#xff1a;ROC曲线下面积 支持向量机思路补充背景知识点积超平面&#xff08;HYPERPLANES超平面的法向量到超平面的最小距离数据集与超…...

自动化测试实战

http://8.137.19.140:9090/blog_login.htm 账号: lisi 密码: 123456 上面是系统链接 1. 自动化测试的步骤 1.1 编写Web测试用例 1.2 创建空项目添加依赖 然后我们创建一个新的java项目(使用maven管理),然后引入我们的配置文件:屏幕截图,驱动管理,selenium库 <dependency…...

qt QPlainTextEdit总结

QPlainTextEdit 概述 用途&#xff1a;专为处理纯文本设计&#xff0c;适合大文本编辑和简单文本显示&#xff08;如日志、代码编辑器&#xff09;。 特点&#xff1a;相比QTextEdit&#xff0c;轻量高效&#xff0c;支持快速加载和滚动大文件&#xff0c;默认不支持富文本。 …...

AWS SES 邮件服务退信/投诉处理与最佳实践指南

在使用 AWS SES 发送邮件时,合理处理退信和投诉是维护发送声誉的关键。本文将详细介绍 SES 中的退信/投诉处理机制以及相关最佳实践。 一、退信处理机制 © ivwdcwso (ID: u012172506) 1.1 退信类型 在 SES 中,退信分为两种类型: 硬退信(Hard Bounce) 永久性错误,如无效…...

理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁

在 WebGPU 开发中&#xff0c; GPUAdapter 是一个至关重要的对象&#xff0c;它作为浏览器与 GPU 之间的桥梁&#xff0c;为开发者提供了请求 GPU 设备、查询 GPU 特性以及获取适配器信息的能力。本文将详细介绍 GPUAdapter 的核心属性和方法&#xff0c;并通过实际代码…...

rpx和px混用方案

&#xff08;1&#xff09;创建一个全局的样式配置文件&#xff1a; // styles/variables.scss :root {// 基础字体大小--font-size-xs: 12px;--font-size-sm: 14px;--font-size-md: 16px;--font-size-lg: 18px;// 响应式间距--spacing-xs: 5px;--spacing-sm: 10px;--spacing-…...

光伏设计软件分类:无人机、Unity3D引擎齐上阵

无人机3D设计 无人机可搭载高分辨率光学相机、激光雷达等测绘设备&#xff0c;对目标区域进行全方位、多角度的航拍作业。通过对采集到的影像数据进行导入处理&#xff0c;运用复杂的图像识别算法与三维重建技术&#xff0c;构建出云端实景3D模型&#xff0c;在实景3D模型中进…...

实测 Taotoken 多模型聚合服务的响应延迟与稳定性观感

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 实测 Taotoken 多模型聚合服务的响应延迟与稳定性观感 作为一名需要频繁调用大模型 API 的开发者&#xff0c;服务的响应速度和稳定…...

GRANDMICRO有容微推出GM502xx系列时钟缓冲器,专为AI/HPC计算与数据中心设计,全面支持PCIe 6.0/7.0

在人工智能、高性能计算及数据中心技术飞速发展的驱动下&#xff0c;系统对时钟信号的完整性、同步精度与抗干扰能力提出了更为严苛的要求。为应对这一挑战&#xff0c;有容微电子今日正式推出全新一代高性能时钟缓冲器芯片——GM502xx系列。该系列产品专为PCIe 6.0及未来7.0标…...

51单片机通过继电器模块实现智能灯光控制

1. 从点灯到智能控制&#xff1a;51单片机与继电器的完美组合 记得我第一次用51单片机点亮LED时&#xff0c;那种成就感至今难忘。但后来发现&#xff0c;单纯的点灯只是电子世界的"Hello World"&#xff0c;真正的乐趣在于让灯光变得"聪明"起来。这就是为…...

ChatGPT 2023年1月更新解读:模型表现、事实性、数学能力与停止生成按钮

&#x1f525;个人主页&#xff1a;杨利杰YJlio❄️个人专栏&#xff1a;《Sysinternals实战教程》《Windows PowerShell 实战》《WINDOWS教程》《IOS教程》《微信助手》《锤子助手》 《Python》 《Kali Linux》 《那些年未解决的Windows疑难杂症》&#x1f31f; 让复杂的事情更…...

为Claude Code配置Taotoken后端解决访问不稳定与额度不足

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 为Claude Code配置Taotoken后端解决访问不稳定与额度不足 Claude Code 作为一款高效的编程助手工具&#xff0c;其原生服务在某些地…...

构建企业级AI对话后端:多协议集成与插件化架构实战

1. 项目概述&#xff1a;一个为AI对话而生的企业级后端引擎 如果你正在寻找一个能同时对接OpenAI、Google Gemini&#xff0c;还能无缝集成OneBot机器人协议&#xff0c;并且拥有强大插件扩展能力的AI对话后端&#xff0c;那么Mio-Chat-Backend很可能就是你技术栈里缺失的那块…...

ChatGPT-RetrievalQA数据集解析:用合成数据训练检索模型的实践指南

1. 项目概述与核心问题最近在信息检索和自然语言处理社区里&#xff0c;一个话题讨论得挺热&#xff1a;既然像ChatGPT这样的大语言模型已经能生成相当不错的答案&#xff0c;我们为什么还需要传统的检索模型&#xff1f;更进一步&#xff0c;ChatGPT生成的这些答案&#xff0c…...

ARM Cortex-A9中断控制器架构与多核处理优化

1. ARM Cortex-A9中断控制器架构解析在嵌入式系统设计中&#xff0c;中断控制器作为处理器与外部设备通信的核心枢纽&#xff0c;其性能直接影响系统的实时响应能力。ARM Cortex-A9 MPCore采用的中断控制器架构&#xff0c;通过硬件级的中断管理和分发机制&#xff0c;为多核处…...

ASIC功能验证:基于规范的方法与Specman实战

1. ASIC功能验证的现状与挑战在当今的芯片设计领域&#xff0c;功能验证已成为决定项目成败的关键环节。作为一名从业十余年的验证工程师&#xff0c;我亲眼见证了ASIC设计规模从几十万门级发展到如今的数亿门级&#xff0c;而验证复杂度却呈指数级增长。传统验证方法在面对这种…...

Spring AI Playground:一站式Java AI应用开发与RAG实践指南

1. 项目概述&#xff1a;一个面向未来的AI应用开发沙盒最近在捣鼓AI应用开发&#xff0c;特别是想把大语言模型&#xff08;LLM&#xff09;的能力无缝集成到现有的Java/Spring生态里&#xff0c;发现了一个宝藏级的开源项目&#xff1a;spring-ai-community/spring-ai-playgro…...