RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录
- RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
- 一、引言
- 二、简介
- 三、准备工作
- 3.1 说明
- 3.2 生成项目
- 四、实战
- 4.1 交换机(Exchanges)
- 4.2 临时队列(Temporary Queues)
- 4.3 绑定(Bindings)
- 4.4 整合代码
- 发布程序
- 订阅程序
- 4.5 验证一:先广播后订阅
- 4.6 验证二:先订阅后广播
- 五、结论
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
一、引言
在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。
二、简介
在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。
三、准备工作
3.1 说明
在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。
3.2 生成项目
首先,我们需要生成两个项目:
EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:
dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client
这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。
四、实战
4.1 交换机(Exchanges)
在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。
我们将创建一个名为logs的fanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。

channel.ExchangeDeclare("logs", ExchangeType.Fanout);
4.2 临时队列(Temporary Queues)
在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。
在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:
var queueName = channel.QueueDeclare().QueueName;
4.3 绑定(Bindings)
我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

4.4 整合代码
发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
4.5 验证一:先广播后订阅
运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息

4.6 验证二:先订阅后广播
先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe;
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。

五、结论
在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:
-
解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。
-
消息广播:
fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。 -
临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。
-
动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。
通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。
相关文章:
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录 RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)一、引言二、简介三、准备工作3.1 说明3.2 生成项目 四、实战4.1 交换机(Exchanges)4.2 临时队列(Temporary Queues&am…...
服务器被挂马怎么办?——解决服务器被挂马的方法和步骤
服务器被挂马(即被植入恶意软件)是一个常见的网络安全问题,可能导致数据泄露、服务中断和经济损失。本文将详细介绍如何检测和清除服务器上的恶意软件,并提供实用的代码示例,帮助读者解决服务器被挂马的问题。 一、什…...
Qt 项目架构设计
在开发一个 Qt 项目时,合理的文件夹结构和清晰的构建流程是非常重要的。Qt 项目通常需要管理源代码、UI 文件、资源文件、构建脚本等。下面我会给出一个详细的文件夹结构示例,并解释每个部分的作用及如何设计 Makefile 或使用 Qt 的 qmake 来自动化构建过…...
Elasticsearch:管理和排除 Elasticsearch 内存故障
作者:来自 Elastic Stef Nestor 随着 Elastic Cloud 提供可观察性、安全性和搜索等解决方案,我们将使用 Elastic Cloud 的用户范围从完整的运营团队扩大到包括数据工程师、安全团队和顾问。作为 Elastic 支持代表,我很乐意与各种各样的用户和…...
高级java每日一道面试题-2024年11月07日-Redis篇-Redis有哪些功能?
如果有遗漏,评论区告诉我进行补充 面试官: Redis有哪些功能? 我回答: Redis 是一个开源的、基于键值对的 NoSQL 数据库,以其高性能、丰富的数据结构和多种功能而闻名。在高级 Java 面试中,了解 Redis 的核心功能和高级特性是非常重要的。以下是 Redi…...
实用且免费的 IP 地域查询 API 接口推荐
实用且免费的 IP 地域查询 API 接口推荐 在日常开发中,IP 地域查询是一个常见需求。最近无意间发现一个实用的 IP 地域查询 API,目前是免费的,未来是否收费尚不可知,但在当前情况下非常值得推荐。 API 地址示例: ht…...
STM32学习笔记----SPI协议
STM32的SPI(串行外设接口,Serial Peripheral Interface)是一种常见的同步串行通信协议,广泛应用于与传感器、显示屏、存储设备等外设的通信。SPI通过主从模式(Master/Slave)来实现数据交换,其中…...
Ceph的pool有两种类型
Replicated Pool(拷贝型Pool,默认) 概述: 这是Ceph的默认存储池类型。它通过生成对象的多份拷贝来确保数据的冗余和高可用性。 工作原理: 每个存入的对象(Object)都会被存储为多个副本…...
推荐一款流程图和图表绘制工具:WizFlow Flowcharter Pro
WizFlow Flowcharter是一款易于使用、功能丰富的Windows流程图和图表绘制工具。它允许用户使用超过一百种预定义的形状和箭头定义形状“样式”。您可以将自己的样式保存在图表模板中,以建立自己的绘图方法。WizFlow附带了完整的流程图模板,以帮助您入门。…...
设计模式之插件模式
插件模式是一种设计模式,可以让您在不修改现有系统代码的情况下扩展功能,非常适合实现监控软件和交换机配置的解耦。在嵌入式Linux系统中,您可以使用C++实现插件机制,使监控软件能够动态加载交换机型号的配置模块。这种方式允许您通过插件形式快速适配新型号的交换机。 插…...
深度学习基础—Beam search集束搜索
引言 深度学习基础—Seq2Seq模型https://blog.csdn.net/sniper_fandc/article/details/143781223?fromshareblogdetail&sharetypeblogdetail&sharerId143781223&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 上篇博客讲到,贪心算…...
STM32 串口输出调试信息
软硬件信息 CubeMX version 6.12.1Keil uVision V5.41.0.0 注意 串口有多种: TTL232485 串口的相关知识: 01-【HAL库】STM32实现串口打印(printf方式) , 内含 TTL 和 232 区别。 我把 232 串口连进 STM32 串口助手收到的信息…...
任务调度中心-XXL-JOB使用详解
目录 详解 调度中心 执行器 原理 快速入门 源码仓库地址 1.初始化数据库 2.配置调度中心 1.解压源码 2.需改配置文件 3.启动调度中心 3.配置执行器 1.引入pom依赖 2.修改配置文件 3.执行器组件配置 4.部署执行器项目 4.开发第一个任务 BEAN模式(类…...
git本地分支推送到远程和远程pull到本地
文章目录 本地分支推送到远程仓库git拉取远程分支到本地 本地分支推送到远程仓库 要将本地分支推送到远程仓库的某个分支(可以是同名的分支,也可以是不同名的分支),你可以使用 git push 命令。这里有几种不同的情况: …...
Python_爬虫1_Requests库入门
目录 Requests库 7个主要方法 Requests库的get()方法 Response对象的属性 爬取网页的通用代码框架 理解requests库的异常 HTTP协议及Requests库方法 HTTP协议 HTTP协议采用URL作为定位网络资源的标识。 HTTP协议对资源的操作 理解PATCH和PUT的区别 HTTP协议与Requse…...
安全见闻1-5
涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识,有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言:面向过程,适用于系统…...
STM32 学习笔记-----STM32 的启动过程
STM32 的启动过程是一个精细而系统的流程,它涉及从芯片复位开始,到初始化系统、设置时钟、运行主程序等一系列步骤。下面详细介绍 STM32 启动过程的主要步骤。 1. Boot引脚设定 STM32 系列芯片有多个启动模式,这些模式是通过引脚࿰…...
35.3K+ Star!PhotoPrism:一款基于AI的开源照片管理工具
PhotoPrism 简介 PhotoPrism[1] 是一个为去中心化网络设计的AI照片应用,它利用最新技术自动标记和查找图片,实现自动图像分类与本地化部署,你可以在家中、私有服务器或云端运行它。 项目特点 主要特点 浏览所有照片和视频,无需担心RAW转换、重复项或视频格式。 使用强大的…...
网络安全:数字时代的守护盾
在21世纪的今天,互联网已经渗透到我们生活的方方面面,从社交互动、在线购物、远程办公到智能家居,无一不彰显着数字技术的便捷与高效。然而,随着网络空间的日益扩大,网络安全问题也日益凸显,成为了一个不容…...
vue 中监听页面尺寸变化就调用函数
方法一:使用 window.onresize 结合 Vue 实例的生命周期钩子(不推荐,存在覆盖风险) 虽然可以直接使用原生的 window.onresize 事件来监听窗口大小变化,但这种方式在 Vue 项目中有一些局限性,因为如果在多个…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...
聚六亚甲基单胍盐酸盐市场深度解析:现状、挑战与机遇
根据 QYResearch 发布的市场报告显示,全球市场规模预计在 2031 年达到 9848 万美元,2025 - 2031 年期间年复合增长率(CAGR)为 3.7%。在竞争格局上,市场集中度较高,2024 年全球前十强厂商占据约 74.0% 的市场…...
文件上传漏洞防御全攻略
要全面防范文件上传漏洞,需构建多层防御体系,结合技术验证、存储隔离与权限控制: 🔒 一、基础防护层 前端校验(仅辅助) 通过JavaScript限制文件后缀名(白名单)和大小,提…...
Xcode 16 集成 cocoapods 报错
基于 Xcode 16 新建工程项目,集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...
