RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录
- RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
- 一、引言
- 二、简介
- 三、准备工作
- 3.1 说明
- 3.2 生成项目
- 四、实战
- 4.1 交换机(Exchanges)
- 4.2 临时队列(Temporary Queues)
- 4.3 绑定(Bindings)
- 4.4 整合代码
- 发布程序
- 订阅程序
- 4.5 验证一:先广播后订阅
- 4.6 验证二:先订阅后广播
- 五、结论
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
一、引言
在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。
二、简介
在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。
三、准备工作
3.1 说明
在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。
3.2 生成项目
首先,我们需要生成两个项目:
EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:
dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client
这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。
四、实战
4.1 交换机(Exchanges)
在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。
我们将创建一个名为logs的fanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。

channel.ExchangeDeclare("logs", ExchangeType.Fanout);
4.2 临时队列(Temporary Queues)
在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。
在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:
var queueName = channel.QueueDeclare().QueueName;
4.3 绑定(Bindings)
我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

4.4 整合代码
发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
4.5 验证一:先广播后订阅
运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息

4.6 验证二:先订阅后广播
先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe;
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。

五、结论
在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:
-
解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。
-
消息广播:
fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。 -
临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。
-
动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。
通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。
相关文章:
RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
文章目录 RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)一、引言二、简介三、准备工作3.1 说明3.2 生成项目 四、实战4.1 交换机(Exchanges)4.2 临时队列(Temporary Queues&am…...
服务器被挂马怎么办?——解决服务器被挂马的方法和步骤
服务器被挂马(即被植入恶意软件)是一个常见的网络安全问题,可能导致数据泄露、服务中断和经济损失。本文将详细介绍如何检测和清除服务器上的恶意软件,并提供实用的代码示例,帮助读者解决服务器被挂马的问题。 一、什…...
Qt 项目架构设计
在开发一个 Qt 项目时,合理的文件夹结构和清晰的构建流程是非常重要的。Qt 项目通常需要管理源代码、UI 文件、资源文件、构建脚本等。下面我会给出一个详细的文件夹结构示例,并解释每个部分的作用及如何设计 Makefile 或使用 Qt 的 qmake 来自动化构建过…...
Elasticsearch:管理和排除 Elasticsearch 内存故障
作者:来自 Elastic Stef Nestor 随着 Elastic Cloud 提供可观察性、安全性和搜索等解决方案,我们将使用 Elastic Cloud 的用户范围从完整的运营团队扩大到包括数据工程师、安全团队和顾问。作为 Elastic 支持代表,我很乐意与各种各样的用户和…...
高级java每日一道面试题-2024年11月07日-Redis篇-Redis有哪些功能?
如果有遗漏,评论区告诉我进行补充 面试官: Redis有哪些功能? 我回答: Redis 是一个开源的、基于键值对的 NoSQL 数据库,以其高性能、丰富的数据结构和多种功能而闻名。在高级 Java 面试中,了解 Redis 的核心功能和高级特性是非常重要的。以下是 Redi…...
实用且免费的 IP 地域查询 API 接口推荐
实用且免费的 IP 地域查询 API 接口推荐 在日常开发中,IP 地域查询是一个常见需求。最近无意间发现一个实用的 IP 地域查询 API,目前是免费的,未来是否收费尚不可知,但在当前情况下非常值得推荐。 API 地址示例: ht…...
STM32学习笔记----SPI协议
STM32的SPI(串行外设接口,Serial Peripheral Interface)是一种常见的同步串行通信协议,广泛应用于与传感器、显示屏、存储设备等外设的通信。SPI通过主从模式(Master/Slave)来实现数据交换,其中…...
Ceph的pool有两种类型
Replicated Pool(拷贝型Pool,默认) 概述: 这是Ceph的默认存储池类型。它通过生成对象的多份拷贝来确保数据的冗余和高可用性。 工作原理: 每个存入的对象(Object)都会被存储为多个副本…...
推荐一款流程图和图表绘制工具:WizFlow Flowcharter Pro
WizFlow Flowcharter是一款易于使用、功能丰富的Windows流程图和图表绘制工具。它允许用户使用超过一百种预定义的形状和箭头定义形状“样式”。您可以将自己的样式保存在图表模板中,以建立自己的绘图方法。WizFlow附带了完整的流程图模板,以帮助您入门。…...
设计模式之插件模式
插件模式是一种设计模式,可以让您在不修改现有系统代码的情况下扩展功能,非常适合实现监控软件和交换机配置的解耦。在嵌入式Linux系统中,您可以使用C++实现插件机制,使监控软件能够动态加载交换机型号的配置模块。这种方式允许您通过插件形式快速适配新型号的交换机。 插…...
深度学习基础—Beam search集束搜索
引言 深度学习基础—Seq2Seq模型https://blog.csdn.net/sniper_fandc/article/details/143781223?fromshareblogdetail&sharetypeblogdetail&sharerId143781223&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 上篇博客讲到,贪心算…...
STM32 串口输出调试信息
软硬件信息 CubeMX version 6.12.1Keil uVision V5.41.0.0 注意 串口有多种: TTL232485 串口的相关知识: 01-【HAL库】STM32实现串口打印(printf方式) , 内含 TTL 和 232 区别。 我把 232 串口连进 STM32 串口助手收到的信息…...
任务调度中心-XXL-JOB使用详解
目录 详解 调度中心 执行器 原理 快速入门 源码仓库地址 1.初始化数据库 2.配置调度中心 1.解压源码 2.需改配置文件 3.启动调度中心 3.配置执行器 1.引入pom依赖 2.修改配置文件 3.执行器组件配置 4.部署执行器项目 4.开发第一个任务 BEAN模式(类…...
git本地分支推送到远程和远程pull到本地
文章目录 本地分支推送到远程仓库git拉取远程分支到本地 本地分支推送到远程仓库 要将本地分支推送到远程仓库的某个分支(可以是同名的分支,也可以是不同名的分支),你可以使用 git push 命令。这里有几种不同的情况: …...
Python_爬虫1_Requests库入门
目录 Requests库 7个主要方法 Requests库的get()方法 Response对象的属性 爬取网页的通用代码框架 理解requests库的异常 HTTP协议及Requests库方法 HTTP协议 HTTP协议采用URL作为定位网络资源的标识。 HTTP协议对资源的操作 理解PATCH和PUT的区别 HTTP协议与Requse…...
安全见闻1-5
涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识,有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言:面向过程,适用于系统…...
STM32 学习笔记-----STM32 的启动过程
STM32 的启动过程是一个精细而系统的流程,它涉及从芯片复位开始,到初始化系统、设置时钟、运行主程序等一系列步骤。下面详细介绍 STM32 启动过程的主要步骤。 1. Boot引脚设定 STM32 系列芯片有多个启动模式,这些模式是通过引脚࿰…...
35.3K+ Star!PhotoPrism:一款基于AI的开源照片管理工具
PhotoPrism 简介 PhotoPrism[1] 是一个为去中心化网络设计的AI照片应用,它利用最新技术自动标记和查找图片,实现自动图像分类与本地化部署,你可以在家中、私有服务器或云端运行它。 项目特点 主要特点 浏览所有照片和视频,无需担心RAW转换、重复项或视频格式。 使用强大的…...
网络安全:数字时代的守护盾
在21世纪的今天,互联网已经渗透到我们生活的方方面面,从社交互动、在线购物、远程办公到智能家居,无一不彰显着数字技术的便捷与高效。然而,随着网络空间的日益扩大,网络安全问题也日益凸显,成为了一个不容…...
vue 中监听页面尺寸变化就调用函数
方法一:使用 window.onresize 结合 Vue 实例的生命周期钩子(不推荐,存在覆盖风险) 虽然可以直接使用原生的 window.onresize 事件来监听窗口大小变化,但这种方式在 Vue 项目中有一些局限性,因为如果在多个…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
