当前位置: 首页 > news >正文

C#+redis实现消息队列的发布订阅功能

代码

参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs

using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}

新建SubscribeMethod.cs

using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}

新建DotnetQueueCore.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}

测试

SubscribeClassTest.cs

using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}

测试方法

private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}

参考

https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html

相关文章:

C#+redis实现消息队列的发布订阅功能

代码 参考c#redis stream实现消息队列以及ack机制文章的思路&#xff0c;实现 SubscribeAttribute.cs using System;namespace DotnetQueue.Attributes {/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited false)]pu…...

Docker容器基本操作

容器的基本操作 操作命令&#xff08;全&#xff09;命令&#xff08;简&#xff09;容器的创建docker container run <image name>docker run <image name>容器的列出&#xff08;up&#xff09;docker container lsdocker ps容器的列出&#xff08;up和exit&…...

从无序到有序:上北智信通过深度数据分析改善会议室资源配置

当前企业普遍面临会议室资源管理难题&#xff0c;预约机制不完善和临时会议多导致资源调度不合理&#xff0c;既有空置又有过度拥挤现象。 针对上述问题&#xff0c;上北智信采用了专业数据分析手段&#xff0c;巧妙融合楼层平面图、环形图、折线图和柱形图等多种可视化工具&a…...

总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证

总结&#xff1a;使用JDK原生HttpsURLConnection&#xff0c;封装HttpsUtil工具类&#xff0c;加载自定义证书验证&#xff0c;忽略ssl证书验证 一HttpsUtil工具类二SSLUtil工具类 一HttpsUtil工具类 package com.example.util;import javax.net.ssl.HttpsURLConnection; impo…...

重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙

近年来,AI Native应用的兴起已逐渐成为大众关注的焦点。在此背景下,Soul App的首席技术官陶明在极客公园IF2025创新大会上,发表了一场主题为“人机关系的新边界,Soul如何定义AI社交未来”的演讲。他分享了Soul在人工智能领域内的最新技术进展和战略规划,同时也将Soul社交元宇宙…...

HTTP 参数污染(HPP)详解

1. 什么是 HTTP 参数污染&#xff08;HPP&#xff09;&#xff1f; HTTP 参数污染&#xff08;HTTP Parameter Pollution&#xff0c;简称 HPP&#xff09;是一种 Web 应用攻击技术&#xff0c;攻击者通过在 HTTP 请求中注入多个相同的参数来绕过安全控制或篡改应用逻辑&#…...

阿里云轻量服务器docker部署nginx

拉取nginx docker镜像 sudo docker pull nginx创建以下挂载目录及文件 用户目录下&#xff1a;conf html logs conf: conf.d nginx.conf html: index.html conf.d: default.confnginx.conf添加文件内容 events {worker_connections 1024; }http {include /etc/ngi…...

(萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?

概念 笔者下面需要介绍的是库寄存器和HAL库两个重要的概念&#xff0c;在各位看完之后&#xff0c;需要决定自己的学习路线到底是学习HAL呢&#xff1f;还是寄存器呢&#xff1f;还是两者都学习呢&#xff1f; 库寄存器 库寄存器就是简单的封装了我们对寄存器的操作&#xf…...

Windchill开发-电子仓相关对象信息查询SQL

电子仓相关对象信息查询SQL 一、说明二、数据表信息三、数据表字段说明3.1 HOLDERTOCONTENT3.1.1 对象类型3.1.2 存储类型 3.2 APPLICATIONDATA3.2.1 类别3.2.2 与对象的角色关系3.2.3 存储方式3.2.4 其他字段 3.3 URLDATA3.4 STREAMDATA3.5 FVITEM3.6 FVMOUNT3.6.1 安装状态3.…...

MySQL 数据库定时任务及进阶学习

一、引言 在当今数字化时代&#xff0c;数据管理的高效性和自动化至关重要。MySQL 作为一款广泛应用的开源关系型数据库管理系统&#xff0c;提供了强大的功能来满足各种数据处理需求。其中&#xff0c;定时任务执行功能对于自动化数据操作、维护数据完整性以及优化系统性能具…...

DeepSeek教unity------MessagePack-01

中文&#xff1a;GitCode - 全球开发者的开源社区,开源代码托管平台 MessagePack是C# 的极速 MessagePack 序列化器。它比 MsgPack-Cli 快 10 倍&#xff0c;并且性能超过其他 C# 序列化器。MessagePack for C# 还内置支持 LZ4 压缩——一种极其快速的压缩算法。性能在诸如游戏…...

知识拓展:Python序列化模块 marshal 模块详解

Python marshal 模块学习笔记 1. 简介 marshal 是 Python 的内部序列化格式&#xff0c;主要用于序列化和反序列化 Python 对象。它是 Python 字节码&#xff08;.pyc文件&#xff09;使用的序列化格式&#xff0c;比 pickle 更原始和受限&#xff0c;但也更快速和安全。 http…...

leetcode 2684. 矩阵中移动的最大次数

题目如下 数据范围 本题使用常规动态规划就行&#xff0c;不过要注意由于有三个转移的方向&#xff0c;所以我们对dp数组的遍历应该是从上到下 从左到右即按列优先遍历。通过代码 class Solution { public:int maxMoves(vector<vector<int>>& grid) {int …...

机械学习基础-6.更多分类-数据建模与机械智能课程自留

data modeling and machine intelligence - FURTHER CLASSIFICATION 混淆矩阵评估指标&#xff1a;灵敏度和特异度ROC 曲线文字说明部分 AUC&#xff1a;ROC曲线下面积 支持向量机思路补充背景知识点积超平面&#xff08;HYPERPLANES超平面的法向量到超平面的最小距离数据集与超…...

自动化测试实战

http://8.137.19.140:9090/blog_login.htm 账号: lisi 密码: 123456 上面是系统链接 1. 自动化测试的步骤 1.1 编写Web测试用例 1.2 创建空项目添加依赖 然后我们创建一个新的java项目(使用maven管理),然后引入我们的配置文件:屏幕截图,驱动管理,selenium库 <dependency…...

qt QPlainTextEdit总结

QPlainTextEdit 概述 用途&#xff1a;专为处理纯文本设计&#xff0c;适合大文本编辑和简单文本显示&#xff08;如日志、代码编辑器&#xff09;。 特点&#xff1a;相比QTextEdit&#xff0c;轻量高效&#xff0c;支持快速加载和滚动大文件&#xff0c;默认不支持富文本。 …...

AWS SES 邮件服务退信/投诉处理与最佳实践指南

在使用 AWS SES 发送邮件时,合理处理退信和投诉是维护发送声誉的关键。本文将详细介绍 SES 中的退信/投诉处理机制以及相关最佳实践。 一、退信处理机制 © ivwdcwso (ID: u012172506) 1.1 退信类型 在 SES 中,退信分为两种类型: 硬退信(Hard Bounce) 永久性错误,如无效…...

理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁

在 WebGPU 开发中&#xff0c; GPUAdapter 是一个至关重要的对象&#xff0c;它作为浏览器与 GPU 之间的桥梁&#xff0c;为开发者提供了请求 GPU 设备、查询 GPU 特性以及获取适配器信息的能力。本文将详细介绍 GPUAdapter 的核心属性和方法&#xff0c;并通过实际代码…...

rpx和px混用方案

&#xff08;1&#xff09;创建一个全局的样式配置文件&#xff1a; // styles/variables.scss :root {// 基础字体大小--font-size-xs: 12px;--font-size-sm: 14px;--font-size-md: 16px;--font-size-lg: 18px;// 响应式间距--spacing-xs: 5px;--spacing-sm: 10px;--spacing-…...

光伏设计软件分类:无人机、Unity3D引擎齐上阵

无人机3D设计 无人机可搭载高分辨率光学相机、激光雷达等测绘设备&#xff0c;对目标区域进行全方位、多角度的航拍作业。通过对采集到的影像数据进行导入处理&#xff0c;运用复杂的图像识别算法与三维重建技术&#xff0c;构建出云端实景3D模型&#xff0c;在实景3D模型中进…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

Python竞赛环境搭建全攻略

Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型&#xff08;算法、数据分析、机器学习等&#xff09;不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...

LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)

在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...

TCP/IP 网络编程 | 服务端 客户端的封装

设计模式 文章目录 设计模式一、socket.h 接口&#xff08;interface&#xff09;二、socket.cpp 实现&#xff08;implementation&#xff09;三、server.cpp 使用封装&#xff08;main 函数&#xff09;四、client.cpp 使用封装&#xff08;main 函数&#xff09;五、退出方法…...

聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇

根据 QYResearch 发布的市场报告显示&#xff0c;全球市场规模预计在 2031 年达到 9848 万美元&#xff0c;2025 - 2031 年期间年复合增长率&#xff08;CAGR&#xff09;为 3.7%。在竞争格局上&#xff0c;市场集中度较高&#xff0c;2024 年全球前十强厂商占据约 74.0% 的市场…...

6.计算机网络核心知识点精要手册

计算机网络核心知识点精要手册 1.协议基础篇 网络协议三要素 语法&#xff1a;数据与控制信息的结构或格式&#xff0c;如同语言中的语法规则语义&#xff1a;控制信息的具体含义和响应方式&#xff0c;规定通信双方"说什么"同步&#xff1a;事件执行的顺序与时序…...