C#+redis实现消息队列的发布订阅功能
代码
参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs
using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}
新建SubscribeMethod.cs
using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}
新建DotnetQueueCore.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}
测试
SubscribeClassTest.cs
using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}
测试方法
private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}
参考
https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html
相关文章:
C#+redis实现消息队列的发布订阅功能
代码 参考c#redis stream实现消息队列以及ack机制文章的思路,实现 SubscribeAttribute.cs using System;namespace DotnetQueue.Attributes {/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited false)]pu…...
Docker容器基本操作
容器的基本操作 操作命令(全)命令(简)容器的创建docker container run <image name>docker run <image name>容器的列出(up)docker container lsdocker ps容器的列出(up和exit&…...
从无序到有序:上北智信通过深度数据分析改善会议室资源配置
当前企业普遍面临会议室资源管理难题,预约机制不完善和临时会议多导致资源调度不合理,既有空置又有过度拥挤现象。 针对上述问题,上北智信采用了专业数据分析手段,巧妙融合楼层平面图、环形图、折线图和柱形图等多种可视化工具&a…...
总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证
总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证 一HttpsUtil工具类二SSLUtil工具类 一HttpsUtil工具类 package com.example.util;import javax.net.ssl.HttpsURLConnection; impo…...
重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙
近年来,AI Native应用的兴起已逐渐成为大众关注的焦点。在此背景下,Soul App的首席技术官陶明在极客公园IF2025创新大会上,发表了一场主题为“人机关系的新边界,Soul如何定义AI社交未来”的演讲。他分享了Soul在人工智能领域内的最新技术进展和战略规划,同时也将Soul社交元宇宙…...
HTTP 参数污染(HPP)详解
1. 什么是 HTTP 参数污染(HPP)? HTTP 参数污染(HTTP Parameter Pollution,简称 HPP)是一种 Web 应用攻击技术,攻击者通过在 HTTP 请求中注入多个相同的参数来绕过安全控制或篡改应用逻辑&#…...
阿里云轻量服务器docker部署nginx
拉取nginx docker镜像 sudo docker pull nginx创建以下挂载目录及文件 用户目录下:conf html logs conf: conf.d nginx.conf html: index.html conf.d: default.confnginx.conf添加文件内容 events {worker_connections 1024; }http {include /etc/ngi…...
(萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?
概念 笔者下面需要介绍的是库寄存器和HAL库两个重要的概念,在各位看完之后,需要决定自己的学习路线到底是学习HAL呢?还是寄存器呢?还是两者都学习呢? 库寄存器 库寄存器就是简单的封装了我们对寄存器的操作…...
Windchill开发-电子仓相关对象信息查询SQL
电子仓相关对象信息查询SQL 一、说明二、数据表信息三、数据表字段说明3.1 HOLDERTOCONTENT3.1.1 对象类型3.1.2 存储类型 3.2 APPLICATIONDATA3.2.1 类别3.2.2 与对象的角色关系3.2.3 存储方式3.2.4 其他字段 3.3 URLDATA3.4 STREAMDATA3.5 FVITEM3.6 FVMOUNT3.6.1 安装状态3.…...
MySQL 数据库定时任务及进阶学习
一、引言 在当今数字化时代,数据管理的高效性和自动化至关重要。MySQL 作为一款广泛应用的开源关系型数据库管理系统,提供了强大的功能来满足各种数据处理需求。其中,定时任务执行功能对于自动化数据操作、维护数据完整性以及优化系统性能具…...
DeepSeek教unity------MessagePack-01
中文:GitCode - 全球开发者的开源社区,开源代码托管平台 MessagePack是C# 的极速 MessagePack 序列化器。它比 MsgPack-Cli 快 10 倍,并且性能超过其他 C# 序列化器。MessagePack for C# 还内置支持 LZ4 压缩——一种极其快速的压缩算法。性能在诸如游戏…...
知识拓展:Python序列化模块 marshal 模块详解
Python marshal 模块学习笔记 1. 简介 marshal 是 Python 的内部序列化格式,主要用于序列化和反序列化 Python 对象。它是 Python 字节码(.pyc文件)使用的序列化格式,比 pickle 更原始和受限,但也更快速和安全。 http…...
leetcode 2684. 矩阵中移动的最大次数
题目如下 数据范围 本题使用常规动态规划就行,不过要注意由于有三个转移的方向,所以我们对dp数组的遍历应该是从上到下 从左到右即按列优先遍历。通过代码 class Solution { public:int maxMoves(vector<vector<int>>& grid) {int …...
机械学习基础-6.更多分类-数据建模与机械智能课程自留
data modeling and machine intelligence - FURTHER CLASSIFICATION 混淆矩阵评估指标:灵敏度和特异度ROC 曲线文字说明部分 AUC:ROC曲线下面积 支持向量机思路补充背景知识点积超平面(HYPERPLANES超平面的法向量到超平面的最小距离数据集与超…...
自动化测试实战
http://8.137.19.140:9090/blog_login.htm 账号: lisi 密码: 123456 上面是系统链接 1. 自动化测试的步骤 1.1 编写Web测试用例 1.2 创建空项目添加依赖 然后我们创建一个新的java项目(使用maven管理),然后引入我们的配置文件:屏幕截图,驱动管理,selenium库 <dependency…...
qt QPlainTextEdit总结
QPlainTextEdit 概述 用途:专为处理纯文本设计,适合大文本编辑和简单文本显示(如日志、代码编辑器)。 特点:相比QTextEdit,轻量高效,支持快速加载和滚动大文件,默认不支持富文本。 …...
AWS SES 邮件服务退信/投诉处理与最佳实践指南
在使用 AWS SES 发送邮件时,合理处理退信和投诉是维护发送声誉的关键。本文将详细介绍 SES 中的退信/投诉处理机制以及相关最佳实践。 一、退信处理机制 © ivwdcwso (ID: u012172506) 1.1 退信类型 在 SES 中,退信分为两种类型: 硬退信(Hard Bounce) 永久性错误,如无效…...
理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁
在 WebGPU 开发中, GPUAdapter 是一个至关重要的对象,它作为浏览器与 GPU 之间的桥梁,为开发者提供了请求 GPU 设备、查询 GPU 特性以及获取适配器信息的能力。本文将详细介绍 GPUAdapter 的核心属性和方法,并通过实际代码…...
rpx和px混用方案
(1)创建一个全局的样式配置文件: // styles/variables.scss :root {// 基础字体大小--font-size-xs: 12px;--font-size-sm: 14px;--font-size-md: 16px;--font-size-lg: 18px;// 响应式间距--spacing-xs: 5px;--spacing-sm: 10px;--spacing-…...
光伏设计软件分类:无人机、Unity3D引擎齐上阵
无人机3D设计 无人机可搭载高分辨率光学相机、激光雷达等测绘设备,对目标区域进行全方位、多角度的航拍作业。通过对采集到的影像数据进行导入处理,运用复杂的图像识别算法与三维重建技术,构建出云端实景3D模型,在实景3D模型中进…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
图解JavaScript原型:原型链及其分析 | JavaScript图解
忽略该图的细节(如内存地址值没有用二进制) 以下是对该图进一步的理解和总结 1. JS 对象概念的辨析 对象是什么:保存在堆中一块区域,同时在栈中有一块区域保存其在堆中的地址(也就是我们通常说的该变量指向谁&…...
__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined.
这个警告表明您在使用Vue的esm-bundler构建版本时,未明确定义编译时特性标志。以下是详细解释和解决方案: 问题原因: 该标志是Vue 3.4引入的编译时特性标志,用于控制生产环境下SSR水合不匹配错误的详细报告1使用esm-bundler…...
Shell 解释器 bash 和 dash 区别
bash 和 dash 都是 Unix/Linux 系统中的 Shell 解释器,但它们在功能、语法和性能上有显著区别。以下是它们的详细对比: 1. 基本区别 特性bash (Bourne-Again SHell)dash (Debian Almquist SHell)来源G…...
LTR-381RGB-01RGB+环境光检测应用场景及客户类型主要有哪些?
RGB环境光检测 功能,在应用场景及客户类型: 1. 可应用的儿童玩具类型 (1) 智能互动玩具 功能:通过检测环境光或物体颜色触发互动(如颜色识别积木、光感音乐盒)。 客户参考: LEGO(乐高&#x…...
Linux信号保存与处理机制详解
Linux信号的保存与处理涉及多个关键机制,以下是详细的总结: 1. 信号的保存 进程描述符(task_struct):每个进程的PCB中包含信号相关信息。 pending信号集:记录已到达但未处理的信号(未决信号&a…...
基于Java的离散数学题库系统设计与实现:附完整源码与论文
JAVASQL离散数学题库管理系统 一、系统概述 本系统采用Java Swing开发桌面应用,结合SQL Server数据库实现离散数学题库的高效管理。系统支持题型分类(选择题、填空题、判断题等)、难度分级、知识点关联,并提供智能组卷、在线测试…...
