当前位置: 首页 > news >正文

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();//声明名为"logs"的fanout类型的交换机await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到队列await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",type: ExchangeType.Fanout);// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 打印接收到的消息Console.WriteLine($" [x] {message}");// 返回Task.CompletedTask以满足异步事件处理的签名要求return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

相关文章:

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录 RabbitMQ教程&#xff1a;发布/订阅模式&#xff08;Publish/Subscribe&#xff09;&#xff08;三&#xff09;一、引言二、简介三、准备工作3.1 说明3.2 生成项目 四、实战4.1 交换机&#xff08;Exchanges&#xff09;4.2 临时队列&#xff08;Temporary Queues&am…...

服务器被挂马怎么办?——解决服务器被挂马的方法和步骤

服务器被挂马&#xff08;即被植入恶意软件&#xff09;是一个常见的网络安全问题&#xff0c;可能导致数据泄露、服务中断和经济损失。本文将详细介绍如何检测和清除服务器上的恶意软件&#xff0c;并提供实用的代码示例&#xff0c;帮助读者解决服务器被挂马的问题。 一、什…...

Qt 项目架构设计

在开发一个 Qt 项目时&#xff0c;合理的文件夹结构和清晰的构建流程是非常重要的。Qt 项目通常需要管理源代码、UI 文件、资源文件、构建脚本等。下面我会给出一个详细的文件夹结构示例&#xff0c;并解释每个部分的作用及如何设计 Makefile 或使用 Qt 的 qmake 来自动化构建过…...

Elasticsearch:管理和排除 Elasticsearch 内存故障

作者&#xff1a;来自 Elastic Stef Nestor 随着 Elastic Cloud 提供可观察性、安全性和搜索等解决方案&#xff0c;我们将使用 Elastic Cloud 的用户范围从完整的运营团队扩大到包括数据工程师、安全团队和顾问。作为 Elastic 支持代表&#xff0c;我很乐意与各种各样的用户和…...

高级java每日一道面试题-2024年11月07日-Redis篇-Redis有哪些功能?

如果有遗漏,评论区告诉我进行补充 面试官: Redis有哪些功能? 我回答: Redis 是一个开源的、基于键值对的 NoSQL 数据库&#xff0c;以其高性能、丰富的数据结构和多种功能而闻名。在高级 Java 面试中&#xff0c;了解 Redis 的核心功能和高级特性是非常重要的。以下是 Redi…...

实用且免费的 IP 地域查询 API 接口推荐

实用且免费的 IP 地域查询 API 接口推荐 在日常开发中&#xff0c;IP 地域查询是一个常见需求。最近无意间发现一个实用的 IP 地域查询 API&#xff0c;目前是免费的&#xff0c;未来是否收费尚不可知&#xff0c;但在当前情况下非常值得推荐。 API 地址示例&#xff1a; ht…...

STM32学习笔记----SPI协议

STM32的SPI&#xff08;串行外设接口&#xff0c;Serial Peripheral Interface&#xff09;是一种常见的同步串行通信协议&#xff0c;广泛应用于与传感器、显示屏、存储设备等外设的通信。SPI通过主从模式&#xff08;Master/Slave&#xff09;来实现数据交换&#xff0c;其中…...

Ceph的pool有两种类型

Replicated Pool&#xff08;拷贝型Pool&#xff0c;默认&#xff09; 概述&#xff1a; 这是Ceph的默认存储池类型。它通过生成对象的多份拷贝来确保数据的冗余和高可用性。 工作原理&#xff1a; 每个存入的对象&#xff08;Object&#xff09;都会被存储为多个副本&#xf…...

推荐一款流程图和图表绘制工具:WizFlow Flowcharter Pro

WizFlow Flowcharter是一款易于使用、功能丰富的Windows流程图和图表绘制工具。它允许用户使用超过一百种预定义的形状和箭头定义形状“样式”。您可以将自己的样式保存在图表模板中&#xff0c;以建立自己的绘图方法。WizFlow附带了完整的流程图模板&#xff0c;以帮助您入门。…...

设计模式之插件模式

插件模式是一种设计模式,可以让您在不修改现有系统代码的情况下扩展功能,非常适合实现监控软件和交换机配置的解耦。在嵌入式Linux系统中,您可以使用C++实现插件机制,使监控软件能够动态加载交换机型号的配置模块。这种方式允许您通过插件形式快速适配新型号的交换机。 插…...

深度学习基础—Beam search集束搜索

引言 深度学习基础—Seq2Seq模型https://blog.csdn.net/sniper_fandc/article/details/143781223?fromshareblogdetail&sharetypeblogdetail&sharerId143781223&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 上篇博客讲到&#xff0c;贪心算…...

STM32 串口输出调试信息

软硬件信息 CubeMX version 6.12.1Keil uVision V5.41.0.0 注意 串口有多种&#xff1a; TTL232485 串口的相关知识&#xff1a; 01-【HAL库】STM32实现串口打印&#xff08;printf方式) &#xff0c; 内含 TTL 和 232 区别。 我把 232 串口连进 STM32 串口助手收到的信息…...

任务调度中心-XXL-JOB使用详解

目录 详解 调度中心 执行器 原理 快速入门 源码仓库地址 1.初始化数据库 2.配置调度中心 1.解压源码 2.需改配置文件 3.启动调度中心 3.配置执行器 1.引入pom依赖 2.修改配置文件 3.执行器组件配置 4.部署执行器项目 4.开发第一个任务 BEAN模式&#xff08;类…...

git本地分支推送到远程和远程pull到本地

文章目录 本地分支推送到远程仓库git拉取远程分支到本地 本地分支推送到远程仓库 要将本地分支推送到远程仓库的某个分支&#xff08;可以是同名的分支&#xff0c;也可以是不同名的分支&#xff09;&#xff0c;你可以使用 git push 命令。这里有几种不同的情况&#xff1a; …...

Python_爬虫1_Requests库入门

目录 Requests库 7个主要方法 Requests库的get()方法 Response对象的属性 爬取网页的通用代码框架 理解requests库的异常 HTTP协议及Requests库方法 HTTP协议 HTTP协议采用URL作为定位网络资源的标识。 HTTP协议对资源的操作 理解PATCH和PUT的区别 HTTP协议与Requse…...

安全见闻1-5

涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识&#xff0c;有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言&#xff1a;面向过程&#xff0c;适用于系统…...

STM32 学习笔记-----STM32 的启动过程

STM32 的启动过程是一个精细而系统的流程&#xff0c;它涉及从芯片复位开始&#xff0c;到初始化系统、设置时钟、运行主程序等一系列步骤。下面详细介绍 STM32 启动过程的主要步骤。 1. Boot引脚设定 STM32 系列芯片有多个启动模式&#xff0c;这些模式是通过引脚&#xff0…...

35.3K+ Star!PhotoPrism:一款基于AI的开源照片管理工具

PhotoPrism 简介 PhotoPrism[1] 是一个为去中心化网络设计的AI照片应用,它利用最新技术自动标记和查找图片,实现自动图像分类与本地化部署,你可以在家中、私有服务器或云端运行它。 项目特点 主要特点 浏览所有照片和视频,无需担心RAW转换、重复项或视频格式。 使用强大的…...

网络安全:数字时代的守护盾

在21世纪的今天&#xff0c;互联网已经渗透到我们生活的方方面面&#xff0c;从社交互动、在线购物、远程办公到智能家居&#xff0c;无一不彰显着数字技术的便捷与高效。然而&#xff0c;随着网络空间的日益扩大&#xff0c;网络安全问题也日益凸显&#xff0c;成为了一个不容…...

vue 中监听页面尺寸变化就调用函数

方法一&#xff1a;使用 window.onresize 结合 Vue 实例的生命周期钩子&#xff08;不推荐&#xff0c;存在覆盖风险&#xff09; 虽然可以直接使用原生的 window.onresize 事件来监听窗口大小变化&#xff0c;但这种方式在 Vue 项目中有一些局限性&#xff0c;因为如果在多个…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析

Linux 内存管理实战精讲&#xff1a;核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用&#xff0c;还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

招商蛇口 | 执笔CID,启幕低密生活新境

作为中国城市生长的力量&#xff0c;招商蛇口以“美好生活承载者”为使命&#xff0c;深耕全球111座城市&#xff0c;以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子&#xff0c;招商蛇口始终与城市发展同频共振&#xff0c;以建筑诠释对土地与生活的…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...