C#+redis实现消息队列的发布订阅功能
代码
参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs
using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}
新建SubscribeMethod.cs
using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}
新建DotnetQueueCore.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}
测试
SubscribeClassTest.cs
using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}
测试方法
private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}
参考
https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html
相关文章:
C#+redis实现消息队列的发布订阅功能
代码 参考c#redis stream实现消息队列以及ack机制文章的思路,实现 SubscribeAttribute.cs using System;namespace DotnetQueue.Attributes {/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited false)]pu…...
Docker容器基本操作
容器的基本操作 操作命令(全)命令(简)容器的创建docker container run <image name>docker run <image name>容器的列出(up)docker container lsdocker ps容器的列出(up和exit&…...
从无序到有序:上北智信通过深度数据分析改善会议室资源配置
当前企业普遍面临会议室资源管理难题,预约机制不完善和临时会议多导致资源调度不合理,既有空置又有过度拥挤现象。 针对上述问题,上北智信采用了专业数据分析手段,巧妙融合楼层平面图、环形图、折线图和柱形图等多种可视化工具&a…...
总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证
总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证 一HttpsUtil工具类二SSLUtil工具类 一HttpsUtil工具类 package com.example.util;import javax.net.ssl.HttpsURLConnection; impo…...
重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙
近年来,AI Native应用的兴起已逐渐成为大众关注的焦点。在此背景下,Soul App的首席技术官陶明在极客公园IF2025创新大会上,发表了一场主题为“人机关系的新边界,Soul如何定义AI社交未来”的演讲。他分享了Soul在人工智能领域内的最新技术进展和战略规划,同时也将Soul社交元宇宙…...
HTTP 参数污染(HPP)详解
1. 什么是 HTTP 参数污染(HPP)? HTTP 参数污染(HTTP Parameter Pollution,简称 HPP)是一种 Web 应用攻击技术,攻击者通过在 HTTP 请求中注入多个相同的参数来绕过安全控制或篡改应用逻辑&#…...
阿里云轻量服务器docker部署nginx
拉取nginx docker镜像 sudo docker pull nginx创建以下挂载目录及文件 用户目录下:conf html logs conf: conf.d nginx.conf html: index.html conf.d: default.confnginx.conf添加文件内容 events {worker_connections 1024; }http {include /etc/ngi…...
(萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?
概念 笔者下面需要介绍的是库寄存器和HAL库两个重要的概念,在各位看完之后,需要决定自己的学习路线到底是学习HAL呢?还是寄存器呢?还是两者都学习呢? 库寄存器 库寄存器就是简单的封装了我们对寄存器的操作…...
Windchill开发-电子仓相关对象信息查询SQL
电子仓相关对象信息查询SQL 一、说明二、数据表信息三、数据表字段说明3.1 HOLDERTOCONTENT3.1.1 对象类型3.1.2 存储类型 3.2 APPLICATIONDATA3.2.1 类别3.2.2 与对象的角色关系3.2.3 存储方式3.2.4 其他字段 3.3 URLDATA3.4 STREAMDATA3.5 FVITEM3.6 FVMOUNT3.6.1 安装状态3.…...
MySQL 数据库定时任务及进阶学习
一、引言 在当今数字化时代,数据管理的高效性和自动化至关重要。MySQL 作为一款广泛应用的开源关系型数据库管理系统,提供了强大的功能来满足各种数据处理需求。其中,定时任务执行功能对于自动化数据操作、维护数据完整性以及优化系统性能具…...
DeepSeek教unity------MessagePack-01
中文:GitCode - 全球开发者的开源社区,开源代码托管平台 MessagePack是C# 的极速 MessagePack 序列化器。它比 MsgPack-Cli 快 10 倍,并且性能超过其他 C# 序列化器。MessagePack for C# 还内置支持 LZ4 压缩——一种极其快速的压缩算法。性能在诸如游戏…...
知识拓展:Python序列化模块 marshal 模块详解
Python marshal 模块学习笔记 1. 简介 marshal 是 Python 的内部序列化格式,主要用于序列化和反序列化 Python 对象。它是 Python 字节码(.pyc文件)使用的序列化格式,比 pickle 更原始和受限,但也更快速和安全。 http…...
leetcode 2684. 矩阵中移动的最大次数
题目如下 数据范围 本题使用常规动态规划就行,不过要注意由于有三个转移的方向,所以我们对dp数组的遍历应该是从上到下 从左到右即按列优先遍历。通过代码 class Solution { public:int maxMoves(vector<vector<int>>& grid) {int …...
机械学习基础-6.更多分类-数据建模与机械智能课程自留
data modeling and machine intelligence - FURTHER CLASSIFICATION 混淆矩阵评估指标:灵敏度和特异度ROC 曲线文字说明部分 AUC:ROC曲线下面积 支持向量机思路补充背景知识点积超平面(HYPERPLANES超平面的法向量到超平面的最小距离数据集与超…...
自动化测试实战
http://8.137.19.140:9090/blog_login.htm 账号: lisi 密码: 123456 上面是系统链接 1. 自动化测试的步骤 1.1 编写Web测试用例 1.2 创建空项目添加依赖 然后我们创建一个新的java项目(使用maven管理),然后引入我们的配置文件:屏幕截图,驱动管理,selenium库 <dependency…...
qt QPlainTextEdit总结
QPlainTextEdit 概述 用途:专为处理纯文本设计,适合大文本编辑和简单文本显示(如日志、代码编辑器)。 特点:相比QTextEdit,轻量高效,支持快速加载和滚动大文件,默认不支持富文本。 …...
AWS SES 邮件服务退信/投诉处理与最佳实践指南
在使用 AWS SES 发送邮件时,合理处理退信和投诉是维护发送声誉的关键。本文将详细介绍 SES 中的退信/投诉处理机制以及相关最佳实践。 一、退信处理机制 © ivwdcwso (ID: u012172506) 1.1 退信类型 在 SES 中,退信分为两种类型: 硬退信(Hard Bounce) 永久性错误,如无效…...
理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁
在 WebGPU 开发中, GPUAdapter 是一个至关重要的对象,它作为浏览器与 GPU 之间的桥梁,为开发者提供了请求 GPU 设备、查询 GPU 特性以及获取适配器信息的能力。本文将详细介绍 GPUAdapter 的核心属性和方法,并通过实际代码…...
rpx和px混用方案
(1)创建一个全局的样式配置文件: // styles/variables.scss :root {// 基础字体大小--font-size-xs: 12px;--font-size-sm: 14px;--font-size-md: 16px;--font-size-lg: 18px;// 响应式间距--spacing-xs: 5px;--spacing-sm: 10px;--spacing-…...
光伏设计软件分类:无人机、Unity3D引擎齐上阵
无人机3D设计 无人机可搭载高分辨率光学相机、激光雷达等测绘设备,对目标区域进行全方位、多角度的航拍作业。通过对采集到的影像数据进行导入处理,运用复杂的图像识别算法与三维重建技术,构建出云端实景3D模型,在实景3D模型中进…...
UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
