当前位置: 首页 > news >正文

C#+redis实现消息队列的发布订阅功能

代码

参考c#+redis stream实现消息队列以及ack机制文章的思路,实现
SubscribeAttribute.cs

using System;namespace DotnetQueue.Attributes
{/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited = false)]public class SubscribeAttribute : Attribute{/// <summary>/// 订阅的名称/// </summary>public string Name { get; }/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name){Name = name;}/// <summary>/// 构造函数注入/// </summary>/// <param name="name"></param>public SubscribeAttribute(string name, string groupName, string consumerName){Name = name;GroupName = groupName;ConsumerName = consumerName;}/// <summary>/// 群组名称/// </summary>public string GroupName { get; private set; } = "group01";/// <summary>/// 消费者名称/// </summary>public string ConsumerName { get; private set; } = "consumer01";}
}

新建SubscribeMethod.cs

using System;
using System.Reflection;namespace DotnetQueue
{/// <summary>/// 订阅的方法/// </summary>public class SubscribeMethod{/// <summary>/// 类的类型/// </summary>public Type ClassType { get; set; }/// <summary>/// 方法/// </summary>public MethodInfo MethodInfo { get; set; }}
}

新建DotnetQueueCore.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using FreeRedis;
using DotnetQueue.Attributes;namespace DotnetQueue
{/// <summary>/// 消息队列核心/// </summary>public class DotnetQueueCore{private readonly CancellationToken cancellationToken;private readonly IRedisClient redisClient;/// <summary>/// 构造函数注入/// </summary>/// <param name="connectionString">redis链接字符串</param>/// <param name="cancellationToken">取消标识</param>public DotnetQueueCore(string connectionString, CancellationToken cancellationToken){this.cancellationToken = cancellationToken;this.redisClient = new RedisClient(connectionString);}/// <summary>/// 发布消息/// </summary>/// <param name="name"></param>/// <param name="msg"></param>/// <returns></returns>public async Task<bool> Publish(string name, string msg){var msgId = await redisClient.XAddAsync(name, "param", msg);return !string.IsNullOrEmpty(msgId);}/// <summary>/// 订阅监听/// </summary>/// <param name="subscribeTypes"></param>public async Task SubscribeListeners(List<Type> subscribeTypes){var allMethods = GetAllMethods(subscribeTypes);List<Task> tasks = new List<Task>();foreach (var subMethod in allMethods){if (subMethod.ClassType == null){await Task.Delay(TimeSpan.FromSeconds(1));continue;}tasks.Add(Task.Run(async () =>{var method = subMethod.MethodInfo;var parameterInfos = method.GetParameters();var attribute = method.GetCustomAttribute<SubscribeAttribute>();var name = attribute.Name;var groupName = attribute.GroupName;var consumerName = attribute.ConsumerName;var ids = new Dictionary<string, string>();ids.Add(name, ">");while (!cancellationToken.IsCancellationRequested){try{//如果数据存在则不需要执行了,第一次需要执行var exists = await redisClient.ExistsAsync(name);if (!exists){await Task.Delay(TimeSpan.FromSeconds(1));continue;}var info = await redisClient.XInfoGroupsAsync(name);if (info == null || info.Length < 1){//创建群组await redisClient.XGroupCreateAsync(name, groupName, id: "0-0", MkStream: true);}var messages =await redisClient.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids); if (messages == null||messages.Length<=0){await Task.Delay(TimeSpan.FromSeconds(1));continue;}foreach (var message in messages){foreach (var entry in message.entries){var obj = Activator.CreateInstance(method.DeclaringType);if (parameterInfos.Length <= 0){method.Invoke(obj, null);}else{var methodParams = GetStreamEntryValues(entry);method.Invoke(obj, methodParams.ToArray());}//确认消息,如果不加会一直读取第一条await redisClient.XAckAsync(name,groupName, entry.id);}}//await redisClient.XReadAsync(0, name, "0-0");await Task.Delay(TimeSpan.FromSeconds(1));}catch (Exception ex){await Task.Delay(TimeSpan.FromSeconds(1));}}}, cancellationToken));}await Task.WhenAll(tasks);}/// <summary>/// 获取全部的标记的方法/// </summary>/// <param name="types"></param>/// <returns></returns>private List<SubscribeMethod> GetAllMethods(List<Type> types){var result = new List<SubscribeMethod>();foreach (var typeInfo in types){var methods = typeInfo.GetMethods(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance).Where(m => Attribute.IsDefined(m, typeof(SubscribeAttribute))).ToList();foreach (var method in methods){result.Add(new SubscribeMethod(){MethodInfo = method,ClassType = typeInfo});}}return result;}/// <summary>/// 获取redis stream中的参数/// </summary>/// <param name="msg"></param>/// <returns></returns>private List<object> GetStreamEntryValues(StreamsEntry msg){if (msg == null || msg.fieldValues == null || msg.fieldValues.Length <= 0){return null;}List<object> res = new List<object>();var length = msg.fieldValues.Length;for (int i = 0; i < length; i++){if (msg.fieldValues[i].ToString() == "param"){if ((i + 1) < length){res.Add(msg.fieldValues[i + 1]);}}}return res;}}
}

测试

SubscribeClassTest.cs

using DotnetQueue.Attributes;namespace DotnetQueue.Test;public class SubscribeClassTest
{/// <summary>/// 测试订阅/// </summary>/// <param name="res"></param>/// <returns></returns>[Subscribe("test.wjl.aaa")]public bool SubTest(string res){Console.WriteLine(res);return !string.IsNullOrEmpty(res);}
}

测试方法

private DotnetQueueCorequeueCore = null;
private CancellationTokenSource cancellationToken = null;/// <summary>
/// 测试方法执行之前
/// </summary>
[TestInitialize]
public void Initialize()
{var connectionString = "127.0.0.1:6379,password=,connectTimeout=3000,connectRetry=1,syncTimeout=10000,DefaultDatabase=0";cancellationToken = new CancellationTokenSource();queueCore = new DotnetQueueCore(connectionString, cancellationToken.Token);
}/// <summary>
/// 执行完成之后
/// </summary>
[TestCleanup]
public void Cleanup()
{cancellationToken.Cancel();
}/// <summary>
/// 发布消息测试
/// </summary>
[TestMethod]
public async Task PublishTestMethod()
{var res = await queueCore.Publish("test.wjl.aaa", "{\"age\":1}");Assert.IsTrue(res);res = await queueCore.Publish("test.wjl.aaa", "{\"age\":2}");Assert.IsTrue(res);
}/// <summary>
/// 接受消息测试
/// </summary>
[TestMethod]
public async Task SubscribeListenersTestMethod()
{var types = new List<Type>();types.Add(typeof(SubscribeClassTest));await queueCore.SubscribeListeners(types);Task.Delay(TimeSpan.FromSeconds(30));Assert.IsTrue(true);
}

参考

https://www.cnblogs.com/yanpeng19940119/p/11603865.html
https://github.com/wmowm/InitQ
https://www.cnblogs.com/tibos/p/14944832.html

相关文章:

C#+redis实现消息队列的发布订阅功能

代码 参考c#redis stream实现消息队列以及ack机制文章的思路&#xff0c;实现 SubscribeAttribute.cs using System;namespace DotnetQueue.Attributes {/// <summary>/// 订阅特性/// </summary>[AttributeUsage(AttributeTargets.Method, Inherited false)]pu…...

Docker容器基本操作

容器的基本操作 操作命令&#xff08;全&#xff09;命令&#xff08;简&#xff09;容器的创建docker container run <image name>docker run <image name>容器的列出&#xff08;up&#xff09;docker container lsdocker ps容器的列出&#xff08;up和exit&…...

从无序到有序:上北智信通过深度数据分析改善会议室资源配置

当前企业普遍面临会议室资源管理难题&#xff0c;预约机制不完善和临时会议多导致资源调度不合理&#xff0c;既有空置又有过度拥挤现象。 针对上述问题&#xff0c;上北智信采用了专业数据分析手段&#xff0c;巧妙融合楼层平面图、环形图、折线图和柱形图等多种可视化工具&a…...

总结:使用JDK原生HttpsURLConnection,封装HttpsUtil工具类,加载自定义证书验证,忽略ssl证书验证

总结&#xff1a;使用JDK原生HttpsURLConnection&#xff0c;封装HttpsUtil工具类&#xff0c;加载自定义证书验证&#xff0c;忽略ssl证书验证 一HttpsUtil工具类二SSLUtil工具类 一HttpsUtil工具类 package com.example.util;import javax.net.ssl.HttpsURLConnection; impo…...

重新定义人机关系边界,Soul以AI社交构建多元社交元宇宙

近年来,AI Native应用的兴起已逐渐成为大众关注的焦点。在此背景下,Soul App的首席技术官陶明在极客公园IF2025创新大会上,发表了一场主题为“人机关系的新边界,Soul如何定义AI社交未来”的演讲。他分享了Soul在人工智能领域内的最新技术进展和战略规划,同时也将Soul社交元宇宙…...

HTTP 参数污染(HPP)详解

1. 什么是 HTTP 参数污染&#xff08;HPP&#xff09;&#xff1f; HTTP 参数污染&#xff08;HTTP Parameter Pollution&#xff0c;简称 HPP&#xff09;是一种 Web 应用攻击技术&#xff0c;攻击者通过在 HTTP 请求中注入多个相同的参数来绕过安全控制或篡改应用逻辑&#…...

阿里云轻量服务器docker部署nginx

拉取nginx docker镜像 sudo docker pull nginx创建以下挂载目录及文件 用户目录下&#xff1a;conf html logs conf: conf.d nginx.conf html: index.html conf.d: default.confnginx.conf添加文件内容 events {worker_connections 1024; }http {include /etc/ngi…...

(萌新入门)如何从起步阶段开始学习STM32 —— 我应该学习HAL库还是寄存器库?

概念 笔者下面需要介绍的是库寄存器和HAL库两个重要的概念&#xff0c;在各位看完之后&#xff0c;需要决定自己的学习路线到底是学习HAL呢&#xff1f;还是寄存器呢&#xff1f;还是两者都学习呢&#xff1f; 库寄存器 库寄存器就是简单的封装了我们对寄存器的操作&#xf…...

Windchill开发-电子仓相关对象信息查询SQL

电子仓相关对象信息查询SQL 一、说明二、数据表信息三、数据表字段说明3.1 HOLDERTOCONTENT3.1.1 对象类型3.1.2 存储类型 3.2 APPLICATIONDATA3.2.1 类别3.2.2 与对象的角色关系3.2.3 存储方式3.2.4 其他字段 3.3 URLDATA3.4 STREAMDATA3.5 FVITEM3.6 FVMOUNT3.6.1 安装状态3.…...

MySQL 数据库定时任务及进阶学习

一、引言 在当今数字化时代&#xff0c;数据管理的高效性和自动化至关重要。MySQL 作为一款广泛应用的开源关系型数据库管理系统&#xff0c;提供了强大的功能来满足各种数据处理需求。其中&#xff0c;定时任务执行功能对于自动化数据操作、维护数据完整性以及优化系统性能具…...

DeepSeek教unity------MessagePack-01

中文&#xff1a;GitCode - 全球开发者的开源社区,开源代码托管平台 MessagePack是C# 的极速 MessagePack 序列化器。它比 MsgPack-Cli 快 10 倍&#xff0c;并且性能超过其他 C# 序列化器。MessagePack for C# 还内置支持 LZ4 压缩——一种极其快速的压缩算法。性能在诸如游戏…...

知识拓展:Python序列化模块 marshal 模块详解

Python marshal 模块学习笔记 1. 简介 marshal 是 Python 的内部序列化格式&#xff0c;主要用于序列化和反序列化 Python 对象。它是 Python 字节码&#xff08;.pyc文件&#xff09;使用的序列化格式&#xff0c;比 pickle 更原始和受限&#xff0c;但也更快速和安全。 http…...

leetcode 2684. 矩阵中移动的最大次数

题目如下 数据范围 本题使用常规动态规划就行&#xff0c;不过要注意由于有三个转移的方向&#xff0c;所以我们对dp数组的遍历应该是从上到下 从左到右即按列优先遍历。通过代码 class Solution { public:int maxMoves(vector<vector<int>>& grid) {int …...

机械学习基础-6.更多分类-数据建模与机械智能课程自留

data modeling and machine intelligence - FURTHER CLASSIFICATION 混淆矩阵评估指标&#xff1a;灵敏度和特异度ROC 曲线文字说明部分 AUC&#xff1a;ROC曲线下面积 支持向量机思路补充背景知识点积超平面&#xff08;HYPERPLANES超平面的法向量到超平面的最小距离数据集与超…...

自动化测试实战

http://8.137.19.140:9090/blog_login.htm 账号: lisi 密码: 123456 上面是系统链接 1. 自动化测试的步骤 1.1 编写Web测试用例 1.2 创建空项目添加依赖 然后我们创建一个新的java项目(使用maven管理),然后引入我们的配置文件:屏幕截图,驱动管理,selenium库 <dependency…...

qt QPlainTextEdit总结

QPlainTextEdit 概述 用途&#xff1a;专为处理纯文本设计&#xff0c;适合大文本编辑和简单文本显示&#xff08;如日志、代码编辑器&#xff09;。 特点&#xff1a;相比QTextEdit&#xff0c;轻量高效&#xff0c;支持快速加载和滚动大文件&#xff0c;默认不支持富文本。 …...

AWS SES 邮件服务退信/投诉处理与最佳实践指南

在使用 AWS SES 发送邮件时,合理处理退信和投诉是维护发送声誉的关键。本文将详细介绍 SES 中的退信/投诉处理机制以及相关最佳实践。 一、退信处理机制 © ivwdcwso (ID: u012172506) 1.1 退信类型 在 SES 中,退信分为两种类型: 硬退信(Hard Bounce) 永久性错误,如无效…...

理解WebGPU 中的 GPUAdapter :连接浏览器与 GPU 的桥梁

在 WebGPU 开发中&#xff0c; GPUAdapter 是一个至关重要的对象&#xff0c;它作为浏览器与 GPU 之间的桥梁&#xff0c;为开发者提供了请求 GPU 设备、查询 GPU 特性以及获取适配器信息的能力。本文将详细介绍 GPUAdapter 的核心属性和方法&#xff0c;并通过实际代码…...

rpx和px混用方案

&#xff08;1&#xff09;创建一个全局的样式配置文件&#xff1a; // styles/variables.scss :root {// 基础字体大小--font-size-xs: 12px;--font-size-sm: 14px;--font-size-md: 16px;--font-size-lg: 18px;// 响应式间距--spacing-xs: 5px;--spacing-sm: 10px;--spacing-…...

光伏设计软件分类:无人机、Unity3D引擎齐上阵

无人机3D设计 无人机可搭载高分辨率光学相机、激光雷达等测绘设备&#xff0c;对目标区域进行全方位、多角度的航拍作业。通过对采集到的影像数据进行导入处理&#xff0c;运用复杂的图像识别算法与三维重建技术&#xff0c;构建出云端实景3D模型&#xff0c;在实景3D模型中进…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

Yolov8 目标检测蒸馏学习记录

yolov8系列模型蒸馏基本流程&#xff0c;代码下载&#xff1a;这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中&#xff0c;**知识蒸馏&#xff08;Knowledge Distillation&#xff09;**被广泛应用&#xff0c;作为提升模型…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...