当前位置: 首页 > article >正文

VEC系列-RabbitMQ 入门笔记

消息队列(MQ)对于开发者来说是一个经常听到的词汇,但在实际开发中,大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的详细讲解,官网文档和技术博客也都介绍得很深入,因此,我在这里就不再赘述。

我一直认为,学习一项技术不仅要知道它是什么,更重要的是知道怎么用,以及在哪些场景下应该用。所以这篇文章主要就是站在一个新手的角度进行描述以及实现MQ的实际运用。

使用MQ的常见情景

  1. 系统解耦:比如电商系统,订单系统 → 库存系统 → 物流系统 订单系统发送“新订单”消息到 MQ,库存系统和物流系统各自订阅处理。即使库存系统或物流系统短暂不可用,消息仍然可以暂存,系统整体不会受影响。这一方面说实话不是架构师也没必要太过关注,毕竟系统的底层普通开发也没这个资格去搭建。只是用于了解,不要因为这段话阻拦学习的脚步。

  2. 流量削峰,降低并发:这个比较好理解,也是最能遇到的情况。用户请求先进入 MQ 队列,由后台的消费端按照数据库的最大承载能力逐步处理请求。确保数据库不会被瞬间压垮,提高系统稳定性。还是电商系统常用些。

  3. 异步任务处理:邮件、短信、推送通知,日志处理等。

理论上MQ能做的不止这些,抛砖引玉,一起深入学习吧。

对MQ进行拆分理解

MQ里常说生产者,消费者等。我会通过简单的例子来描述:

  • 生产者:一个游戏,我是GM,我要发送公告,玩家分为普通玩家和VIP玩家等。在这里,发布公告的人就是消息的生产者。应该很好理解嗷?

  • 交换器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前还是要舔下的……那么我会发布一条给普通玩家的消息,和一条给VIP玩家的消息。交换器的作用在我看来就是消息的承载体,类似一条运输船,负责把消息运输给玩家们。产生消息的地方很多,但是交换器不用关心是谁发布了消息,他只承载你的消息。

  • 队列:如上述,有了运输船。那么队列有点像是码头了。普通玩家进普通码头,VIP玩家进黄金码头。各自码头停泊各自的船。总不会在普通码头取出黄金码头的货哦?

补充:交换器是有类型的:Direct(直连交换器)Fanout(扇形交换器)Topic(主题交换器)Headers(头交换器)

概念不多说了。比较常用的是Direct,Fanout

Direct:通过路由键进行匹配,运输船是一艘,但是分为普通区和VIP区,玩家凭借船票(路由键)进行取货(取消息)

Fanout:只要是是绑定了某个交换器的队列都能进行取货。玩家进普通码头就拿普通货,进黄金码头拿黄金货。当然这是举例子,玩家的队列还是要看你如何分配的。

  • 消费者:说了这么多,玩家就是消费者嗷。

MQ代码演示 

最新代码是通过 事件总线 来跨方法传递信息和触发动作。通过发布和订阅事件,模块之间能够解耦通信,使得事件的发布和处理不再依赖于直接调用方法的方式,而是通过事件总线进行跨模块、跨方法的异步传递。这种方式提高了系统的灵活性和扩展性,同时保持了模块之间的松耦合。

长代码警告,有兴趣可以fork仓库进行实际操练 VerEasy.Core

必要的知识点大致如此,通过代码+注释的形式来演示更好理解。

我这里是NETCore项目,所以还是接口的形式方便依赖注入。

接口部分代码

    public interface IRabbitMQPersistentConnection{/// <summary>/// 是否已经连接:判断MQ是否是连接状态/// </summary>bool IsConnected { get; }/// <summary>/// 尝试连接:断连重连方法/// </summary>/// <returns></returns>Task<bool> TryConnectAsync();/// <summary>/// 唯一通道:发布通道可以随时关闭,消费通道需要保持打开状态,否则无法进行消费。/// </summary>IChannel Channel { get; }/// <summary>/// 唯一连接:同理,一个连接可以有N个通道,无需建立过多连接。/// </summary>IConnection Connection { get; }/// <summary>/// 释放/// </summary>/// <returns></returns>Task DisposeAsync();/// <summary>/// 发布:发布消息/// </summary>/// <param name="msg"></param>/// <param name="exChangeName"></param>/// <param name="routeKey"></param>/// <param name="type"></param>/// <returns></returns>Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);/// <summary>/// 订阅:订阅队列。/// </summary>/// <returns></returns>Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);}

 接口实现部分代码

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{//构造函数注入,获取MQ的地址账号密码端口,如果不传就用我默认配置的。public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5){_connectionFactory = connectionFactory ?? new ConnectionFactory{HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};//使用Policy进行重连,这个是重连次数=5_retryCount = retryCount;}//私有变量,获取连接成功时创建的Mq通道。private IChannel _channel = default!;public IChannel Channel{get{return _channel;}}/// <summary>/// RabbitMQ 连接工厂/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 私有变量 RabbitMQ 连接上下文/// </summary>private IConnection _connection = default!;/// <summary>/// 重连次数/// </summary>private readonly int _retryCount;/// <summary>/// 标志是否已释放/// </summary>private bool _disposed;/// <summary>/// 是否有效连接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}public IConnection Connection{get{return _connection;}}/// <summary>/// 手动释放/// </summary>/// <returns></returns>public async Task DisposeAsync(){if (_disposed) return;_disposed = true;try{await _connection.DisposeAsync();}catch (IOException ex){Console.WriteLine(ex.Message);}}/// <summary>/// 重连机制/// </summary>/// <returns></returns>public async Task<bool> TryConnectAsync(){var policy = Policy.Handle<SocketException>()//捕获连接异常.Or<BrokerUnreachableException>()//无法连接异常.WaitAndRetryAsync(_retryCount, x =>TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>{//日志});try{await policy.ExecuteAsync(async () =>{//重建连接【赋值给私有化变量,通过get同步给接口里的Connection和Channel】_connection = await _connectionFactory.CreateConnectionAsync();_channel = await _connection.CreateChannelAsync();});//如果连接成功if (IsConnected){// 连接成功后,注册连接关闭、异常、阻塞的事件处理程序_connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;_connection.CallbackExceptionAsync += OnCallbackExceptionAsync;_connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;return true;}else{return false;}}catch (Exception ex){Console.WriteLine($"重连失败,最终抛出异常: {ex.Message}");return false;}}private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ连接关闭,正在尝试重连...");await TryConnectAsync();}private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e){if (_disposed) return;Console.WriteLine($"RabbitMQ连接出现异常,正在尝试重连... 异常信息: {e.Exception.Message}");await TryConnectAsync();}private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ连接被阻塞,正在尝试重连...");await TryConnectAsync();}//发布消息public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){//判断是否连接状态,没有连接就重连if (!IsConnected){await TryConnectAsync();}//创建通道,因为是发布消息,通道不用常打开,所以使用了USINGusing var channel = await _connection.CreateChannelAsync();//【ExchangeDeclareAsync】声明交换机,exchange:交换机名称,type:交换机类型await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);//msg就是消息,需要传递Byte[]var body = Encoding.UTF8.GetBytes(msg);//启动消息持久化,我的项目里使用MQ来进行公告的推送,使用的Fanout类型交换机,故此消息保持持久化。var properties = new BasicProperties(){Persistent = true,};//发布消息await channel.BasicPublishAsync(exchange: exChangeName,routingKey: routeKey,mandatory: false,basicProperties: properties,body: body);}//订阅消息public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){if (!IsConnected){await TryConnectAsync();}//【queue】队列string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;//【durable】持久化队列,MQ服务器不会删除它。QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false);//根据queue,exchange,routingKey 对 交换机和队列进行绑定,如果是Fanout类型不需要routeKey。await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);//创建消费者var consumer = new AsyncEventingBasicConsumer(Channel);//消费者消费后执行方法consumer.ReceivedAsync += async (model, ea) =>{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);//确认消息已被消费,这样后续该消息就不会被该队列继续消费到了。await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);};//启动消费者队列,将消费者和队列绑定await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);}}

 

MQ服务注入

            if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var connectionFactory = new ConnectionFactory(){HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};var mq = new RabbitMQPersistentConnection(connectionFactory);return mq;});}

 


我在注入各种服务时,添加了一些日志进行输出,效果如下:

相关文章:

VEC系列-RabbitMQ 入门笔记

消息队列&#xff08;MQ&#xff09;对于开发者来说是一个经常听到的词汇&#xff0c;但在实际开发中&#xff0c;大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的详细讲解&#xff0c;官网文档和技术博客也都介绍得很深入&#xff0c;因此&#xff0c;我在这里…...

第5章 使用OSSEC进行监控(网络安全防御实战--蓝军武器库)

网络安全防御实战--蓝军武器库是2020年出版的&#xff0c;已经过去3年时间了&#xff0c;最近利用闲暇时间&#xff0c;抓紧吸收&#xff0c;总的来说&#xff0c;第5章开始进入主机安全&#xff08;HIDS&#xff09;领域了&#xff0c;2022年的时候有幸做过终端安全一段时间&a…...

安装IK分词器;IK分词器配置扩展词库:配置扩展字典-扩展词,配置扩展停止词字典-停用词

安装IK分词器&#xff1b;IK分词器配置扩展词库&#xff1a;配置扩展字典-扩展词&#xff0c;配置扩展停止词字典-停用词 安装IK分词器IK分词配置扩展词库配置扩展字典-扩展词配置停止词字典-停用词测试配置字典前配置字典后 本文 ElasticSearch 版本为&#xff1a;7.17.9&…...

cursor+deepseek实现完整的俄罗斯方块小游戏

<!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>俄罗斯方块</title><style>body {margin: 0;display: flex;justify-content: center;align-items: center;height: 100vh;background: …...

Oracle 数据库基础入门(五):限制查询与范式三约定深度解析

在 Oracle 数据库的学习进程中&#xff0c;限制查询与范式三约定是两个极为重要的概念。限制查询帮助我们精准获取特定范围的数据&#xff0c;而范式三约定则为数据库设计提供了科学的指导框架。对于 Java 全栈开发者而言&#xff0c;掌握这些知识不仅有助于高效地从数据库中提…...

pgsql行列转换

目录 一、造测试数据 二、行转列 1.函数定义 2.语法 3.示例 三、列转行 1.函数定义 2.语法 3.示例 一、造测试数据 create table test ( id int, json1 varchar, json2 varchar );insert into test values(1,111,{111}); insert into test values(2,111,222,{111,22…...

Nginx 开启Baise认证

开启Baise认证 需要再站点Server配置中添加一下配置&#xff0c;添加htpasswd文件 server{auth_basic "HTTP Basic Authentication";auth_basic_user_file /etc/nginx/htpasswd;# 其他配置信息... }如果你的 Linux 服务器没有安装 htpasswd 工具&#xff0c;可以通…...

Android 多用户相关

Android 多用户相关 本文主要记录下android 多用户相关的adb 命令操作. 1: 获取用户列表 命令: adb shell pm list users 输出如下: Users:UserInfo{0:机主:c13} running默认只有一个用户, id为0 &#xff0c;用户状态为运行 2: 创建新用户 命令&#xff1a; adb shell …...

基于python实现的疫情数据可视化分析系统

基于python实现的疫情数据可视化分析系统 开发语言:Python 数据库&#xff1a;MySQL所用到的知识&#xff1a;Django框架工具&#xff1a;pycharm、Navicat 系统功能实现 总体设计 系统实现 系统功能模块 系统首页可以查看首页、疫情信息、核酸检测、新闻资讯、个人中心、后…...

计算机毕业设计SpringBoot+Vue.js陕西民俗网(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

Win7重装不翻车!ISO镜像安全下载渠道+BIOS设置避雷手册

一、写在前面&#xff1a;为什么你需要这份教程&#xff1f; 当电脑频繁蓝屏、系统崩溃甚至无法开机时&#xff0c;重装系统可能是最后的救命稻草。但市面上的教程往往存在三大痛点&#xff1a; ⚠️ 镜像来源不明导致系统被植入后门 ⚠️ 启动盘制作失败反复折腾 ⚠️ 操作失…...

[项目]基于FreeRTOS的STM32四轴飞行器: 四.LED控制

基于FreeRTOS的STM32四轴飞行器: 四.LED控制 一.配置Com层二.编写驱动 一.配置Com层 先在Com_Config.h中定义灯位置的枚举类型&#xff1a; 之后定义Led的结构体&#xff1a; 定义飞行器状态&#xff1a; 在Com_Config.c中初始化四个灯&#xff1a; 在Com_Config.h外部声明…...

macos查询pip默认镜像地址

在 macOS 系统中&#xff0c;查询 pip 的默认镜像地址可以通过以下几种方法&#xff1a; 方法 1&#xff1a;直接通过 pip config list 命令查询 运行以下命令查看当前 pip 的配置&#xff08;包括镜像地址&#xff09;&#xff1a; pip config list 如果输出中包含 global…...

计算机毕业设计SpringBoot+Vue.js青年公寓服务平台(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

【虚拟仿真】Unity3D中实现激光/射线的发射/折射/反射的效果(3D版)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享QQ群:398291828小红书小破站大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。...

flutter环境最新踩坑

## Flutter 开发常见问题排查与解决 ### 1. 项目初始化与依赖问题 bash # 清理项目 flutter clean # 获取依赖 flutter pub get # 详细日志运行 flutter run -v ### 2. 网络和下载问题 - 网络慢可能导致依赖下载卡住 - 使用 -v 参数可查看详细日志 - 检查网络连接 - 可以尝…...

使用 Spring Boot 实现前后端分离的海康威视 SDK 视频监控

使用 Spring Boot 实现前后端分离的海康威视 SDK 视频监控系统&#xff0c;可以分为以下几个步骤&#xff1a; 1. 系统架构设计 前端&#xff1a;使用 Vue.js、React 或 Angular 等前端框架实现用户界面。后端&#xff1a;使用 Spring Boot 提供 RESTful API&#xff0c;负责与…...

VScode 中文符号出现黄色方框的解决方法

VScode 中文符号出现黄色方框的解决方法 我的vscode的python多行注释中会将中文字符用黄色方框框处&#xff1a; 只需要打开设置搜索unicode&#xff0c;然后将这一项的勾选取消掉就可以了&#xff1a; 取消之后的效果如下&#xff1a; 另一种情况&#xff1a;中文显示出现黄色…...

⭐算法OJ⭐跳跃游戏【贪心算法】(C++实现)Jump Game 系列 I,II

既股票买卖系列之后的第二组贪心算法题目&#xff1a;跳跃游戏系列。这一篇介绍的两个问题&#xff0c;其输入均为一个数组&#xff0c;每个元素表示在该位置可以跳跃的最大长度。 55. Jump Game You are given an integer array nums. You are initially positioned at the …...

阿里云MaxCompute面试题汇总及参考答案

目录 简述 MaxCompute 的核心功能及适用场景,与传统数据仓库的区别 解释 MaxCompute 分层架构设计原则,与传统数仓分层有何异同 MaxCompute 的存储架构如何实现高可用与扩展性 解析伏羲(Fuxi)分布式调度系统工作原理 盘古(Pangu)分布式存储系统数据分片策略 计算与存…...

JCRQ1河马算法+四模型对比!HO-CNN-GRU-Attention系列四模型多变量时序预测

JCRQ1河马算法四模型对比&#xff01;HO-CNN-GRU-Attention系列四模型多变量时序预测 目录 JCRQ1河马算法四模型对比&#xff01;HO-CNN-GRU-Attention系列四模型多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 基于HO-CNN-GRU-Attention、CNN-GRU-Attent…...

探索低空经济,无人机及载人直升机低空应用技术详解

探索低空经济时&#xff0c;无人机及载人直升机低空应用技术是核心要素。以下是对这两类技术的详细解析&#xff1a; 一、无人机低空应用技术 1. 飞行控制技术 无人机需要强大的飞行控制系统&#xff0c;这涉及传感器融合、飞行器稳定性控制、自动化飞行和紧急情况下的自动避…...

Python:简单的爬虫程序,从web页面爬取图片与标题并保存MySQL

文章目录 一、环境说明二、基本思路三、代码 一、环境说明 python 版本&#xff1a;3.10 MySQL版本&#xff1a;8 二、基本思路 首先&#xff0c;我们需要查看网页源代码 通过html源码&#xff0c;确定我们要抓取的内容所在标签的特点 然后&#xff0c;利用BeautifulSoup进…...

GStreamer —— 2.3、Windows下Qt加载GStreamer库后运行 - “教程3:动态管道“(附:完整源码)

运行效果&#xff08;音频&#xff09; 简介 上一个教程演示了GStreamer 概念。本教程中的管在它设置为 playing 状态之前完全构建。这没关系。如果 我们没有采取进一步的行动&#xff0c;数据会到达 pipeline 的 pipeline 和 pipeline 将生成错误消息并停止。但 我们将采取进一…...

【Java数据结构】前K个高频单词

前K个高频单词 692. 前K个高频单词 - 力扣&#xff08;LeetCode&#xff09; 解决这个问题我们先得知道每个单词出现的次数&#xff0c;用map存储下来&#xff0c;然后将出现次数最多的通过建立小根堆解决top-K问题 &#xff0c;重点是top-K的求取。 1.建立map 首先我们可以…...

Ubuntu20.04本地配置IsaacLab 4.5.0的训练环境(一)

Ubuntu20.04本地配置IsaacLab 4.5.0的训练环境&#xff08;一&#xff09; 配置conda虚拟环境&#xff08;对于这一步&#xff0c;个人感觉跟在配置IsaacLab那一节的./isaaclab.sh --install同样要执行这一步&#xff0c;建议先不执行&#xff09;配置IsaacSim配置IsaacLab 写在…...

第二次CCF-CSP认证(含C++源码)

第二次CCF-CSP认证 第一道&#xff08;easy&#xff09;思路及AC代码 第二道&#xff08;easy&#xff09;基本思路及AC代码 第三道&#xff08;mid&#xff09;基本思路及AC代码solution 1 (模拟)solution 2&#xff08;KMP&#xff09; 第一道&#xff08;easy&#xff09; 题…...

前端多角色权限页面(同浏览器同时登录)数据互串解决

项目是使用vue3写的 问题说明 现在的问题是&#xff0c;在同个浏览器打开两个标签页&#xff08;都是登录页面&#xff09;&#xff0c;A标签页先登录A的账号&#xff0c;然后B标签页登录B账号。我的登录信息&#xff08;userInfo和token、权限等都是存放在localStorage中的&…...

常见面试问题:MVC模式

MVC&#xff08;Model-View-Controller&#xff09;是一种分层架构设计模式&#xff0c;核心思想是通过职责分离提升代码的可维护性和扩展性。它的三个组件分工如下&#xff1a; 1. Model&#xff08;模型&#xff09; 职责&#xff1a;管理数据和业务逻辑&#xff0c;与数据库…...

【项目】视频点播

一、项目介绍 1. 对视频点播系统的认识 搭建视频共享点播服务器&#xff0c;可以让所有人通过浏览器访问服务器&#xff0c;实现视频的上传查看&#xff0c;以及管理并播放的功能。主要是完成服务器端的程序业务功能的实现以及前端访问界面 html 的编写&#xff0c;能够支持客…...