RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录
- RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
- 一、引言
- 二、简介
- 三、准备工作
- 3.1 说明
- 3.2 生成项目
- 四、实战
- 4.1 交换机(Exchanges)
- 4.2 临时队列(Temporary Queues)
- 4.3 绑定(Bindings)
- 4.4 整合代码
- 发布程序
- 订阅程序
- 4.5 验证一:先广播后订阅
- 4.6 验证二:先订阅后广播
- 五、结论
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
一、引言
在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。
二、简介
在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。
三、准备工作
3.1 说明
在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。
3.2 生成项目
首先,我们需要生成两个项目:
EmitLogApp
:用于模拟日志消息的发布者。
ReceiveLogsApp
:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:
dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client
这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。
四、实战
4.1 交换机(Exchanges)
在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。
我们将创建一个名为logs
的fanout
类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
4.2 临时队列(Temporary Queues)
在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。
在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:
var queueName = channel.QueueDeclare().QueueName;
4.3 绑定(Bindings)
我们已经创建了一个fanout
交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);
4.4 整合代码
发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
4.5 验证一:先广播后订阅
运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe
,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
4.6 验证二:先订阅后广播
先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
;
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe
;可以发现每个消费者(订阅者)都收到了相同信息。
五、结论
在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout
类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:
-
解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。
-
消息广播:
fanout
类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。 -
临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。
-
动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。
通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。
相关文章:

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录 RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)一、引言二、简介三、准备工作3.1 说明3.2 生成项目 四、实战4.1 交换机(Exchanges)4.2 临时队列(Temporary Queues&am…...
服务器被挂马怎么办?——解决服务器被挂马的方法和步骤
服务器被挂马(即被植入恶意软件)是一个常见的网络安全问题,可能导致数据泄露、服务中断和经济损失。本文将详细介绍如何检测和清除服务器上的恶意软件,并提供实用的代码示例,帮助读者解决服务器被挂马的问题。 一、什…...
Qt 项目架构设计
在开发一个 Qt 项目时,合理的文件夹结构和清晰的构建流程是非常重要的。Qt 项目通常需要管理源代码、UI 文件、资源文件、构建脚本等。下面我会给出一个详细的文件夹结构示例,并解释每个部分的作用及如何设计 Makefile 或使用 Qt 的 qmake 来自动化构建过…...

Elasticsearch:管理和排除 Elasticsearch 内存故障
作者:来自 Elastic Stef Nestor 随着 Elastic Cloud 提供可观察性、安全性和搜索等解决方案,我们将使用 Elastic Cloud 的用户范围从完整的运营团队扩大到包括数据工程师、安全团队和顾问。作为 Elastic 支持代表,我很乐意与各种各样的用户和…...
高级java每日一道面试题-2024年11月07日-Redis篇-Redis有哪些功能?
如果有遗漏,评论区告诉我进行补充 面试官: Redis有哪些功能? 我回答: Redis 是一个开源的、基于键值对的 NoSQL 数据库,以其高性能、丰富的数据结构和多种功能而闻名。在高级 Java 面试中,了解 Redis 的核心功能和高级特性是非常重要的。以下是 Redi…...
实用且免费的 IP 地域查询 API 接口推荐
实用且免费的 IP 地域查询 API 接口推荐 在日常开发中,IP 地域查询是一个常见需求。最近无意间发现一个实用的 IP 地域查询 API,目前是免费的,未来是否收费尚不可知,但在当前情况下非常值得推荐。 API 地址示例: ht…...
STM32学习笔记----SPI协议
STM32的SPI(串行外设接口,Serial Peripheral Interface)是一种常见的同步串行通信协议,广泛应用于与传感器、显示屏、存储设备等外设的通信。SPI通过主从模式(Master/Slave)来实现数据交换,其中…...
Ceph的pool有两种类型
Replicated Pool(拷贝型Pool,默认) 概述: 这是Ceph的默认存储池类型。它通过生成对象的多份拷贝来确保数据的冗余和高可用性。 工作原理: 每个存入的对象(Object)都会被存储为多个副本…...

推荐一款流程图和图表绘制工具:WizFlow Flowcharter Pro
WizFlow Flowcharter是一款易于使用、功能丰富的Windows流程图和图表绘制工具。它允许用户使用超过一百种预定义的形状和箭头定义形状“样式”。您可以将自己的样式保存在图表模板中,以建立自己的绘图方法。WizFlow附带了完整的流程图模板,以帮助您入门。…...
设计模式之插件模式
插件模式是一种设计模式,可以让您在不修改现有系统代码的情况下扩展功能,非常适合实现监控软件和交换机配置的解耦。在嵌入式Linux系统中,您可以使用C++实现插件机制,使监控软件能够动态加载交换机型号的配置模块。这种方式允许您通过插件形式快速适配新型号的交换机。 插…...

深度学习基础—Beam search集束搜索
引言 深度学习基础—Seq2Seq模型https://blog.csdn.net/sniper_fandc/article/details/143781223?fromshareblogdetail&sharetypeblogdetail&sharerId143781223&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 上篇博客讲到,贪心算…...

STM32 串口输出调试信息
软硬件信息 CubeMX version 6.12.1Keil uVision V5.41.0.0 注意 串口有多种: TTL232485 串口的相关知识: 01-【HAL库】STM32实现串口打印(printf方式) , 内含 TTL 和 232 区别。 我把 232 串口连进 STM32 串口助手收到的信息…...

任务调度中心-XXL-JOB使用详解
目录 详解 调度中心 执行器 原理 快速入门 源码仓库地址 1.初始化数据库 2.配置调度中心 1.解压源码 2.需改配置文件 3.启动调度中心 3.配置执行器 1.引入pom依赖 2.修改配置文件 3.执行器组件配置 4.部署执行器项目 4.开发第一个任务 BEAN模式(类…...
git本地分支推送到远程和远程pull到本地
文章目录 本地分支推送到远程仓库git拉取远程分支到本地 本地分支推送到远程仓库 要将本地分支推送到远程仓库的某个分支(可以是同名的分支,也可以是不同名的分支),你可以使用 git push 命令。这里有几种不同的情况: …...

Python_爬虫1_Requests库入门
目录 Requests库 7个主要方法 Requests库的get()方法 Response对象的属性 爬取网页的通用代码框架 理解requests库的异常 HTTP协议及Requests库方法 HTTP协议 HTTP协议采用URL作为定位网络资源的标识。 HTTP协议对资源的操作 理解PATCH和PUT的区别 HTTP协议与Requse…...

安全见闻1-5
涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识,有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言:面向过程,适用于系统…...
STM32 学习笔记-----STM32 的启动过程
STM32 的启动过程是一个精细而系统的流程,它涉及从芯片复位开始,到初始化系统、设置时钟、运行主程序等一系列步骤。下面详细介绍 STM32 启动过程的主要步骤。 1. Boot引脚设定 STM32 系列芯片有多个启动模式,这些模式是通过引脚࿰…...

35.3K+ Star!PhotoPrism:一款基于AI的开源照片管理工具
PhotoPrism 简介 PhotoPrism[1] 是一个为去中心化网络设计的AI照片应用,它利用最新技术自动标记和查找图片,实现自动图像分类与本地化部署,你可以在家中、私有服务器或云端运行它。 项目特点 主要特点 浏览所有照片和视频,无需担心RAW转换、重复项或视频格式。 使用强大的…...
网络安全:数字时代的守护盾
在21世纪的今天,互联网已经渗透到我们生活的方方面面,从社交互动、在线购物、远程办公到智能家居,无一不彰显着数字技术的便捷与高效。然而,随着网络空间的日益扩大,网络安全问题也日益凸显,成为了一个不容…...
vue 中监听页面尺寸变化就调用函数
方法一:使用 window.onresize 结合 Vue 实例的生命周期钩子(不推荐,存在覆盖风险) 虽然可以直接使用原生的 window.onresize 事件来监听窗口大小变化,但这种方式在 Vue 项目中有一些局限性,因为如果在多个…...

51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
实现弹窗随键盘上移居中
实现弹窗随键盘上移的核心思路 在Android中,可以通过监听键盘的显示和隐藏事件,动态调整弹窗的位置。关键点在于获取键盘高度,并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...

基于IDIG-GAN的小样本电机轴承故障诊断
目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) 梯度归一化(Gradient Normalization) (2) 判别器梯度间隙正则化(Discriminator Gradient Gap Regularization) (3) 自注意力机制(Self-Attention) 3. 完整损失函数 二…...

Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...

MySQL的pymysql操作
本章是MySQL的最后一章,MySQL到此完结,下一站Hadoop!!! 这章很简单,完整代码在最后,详细讲解之前python课程里面也有,感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...
Vue3中的computer和watch
computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...