RabbitMQ教程:路由(Routing)(四)
文章目录
- RabbitMQ教程:路由(Routing)(四)
- 一、引言
- 二、基本概念
- 2.1 路由与绑定
- 2.2 Direct交换机
- 2.3 多绑定
- 2.4 发送日志
- 2.5 订阅
- 三、整合代码
- 3.1 EmitLogDirectApp.cs
- 3.2 ReceiveLogsDirectApp.cs
- 3.3 推送所有和接收error、warning级别日志
- 3.4 推送和接收error级别日志
- 四、结论
RabbitMQ教程:路由(Routing)(四)
一、引言
在之前的教程中,我们构建了一个简单的日志系统,该系统能够将日志消息广播给多个接收者。在本教程中,我们将扩展这个系统,增加一个功能:只订阅消息的一个子集。例如,我们可能只想将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
二、基本概念
2.1 路由与绑定
在之前的示例中,我们已经创建了绑定。绑定是交换机和队列之间的关系,可以简单地理解为:队列对来自这个交换机的消息感兴趣。
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);
绑定可以带一个额外的routingKey参数,为了避免与BasicPublish参数混淆,我们将其称为binding key。下面是如何创建带有键的绑定:
channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");
绑定键的含义取决于交换机类型。我们之前使用的fanout交换机,简单地忽略了它的值。
2.2 Direct交换机
我们的日志系统从上一个教程中广播所有消息给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望写入磁盘日志消息的脚本只接收关键错误,而不是浪费磁盘空间在警告或信息日志消息上。
我们之前使用的fanout交换机没有给我们提供太多灵活性——它只能进行无脑广播。
我们将改用direct交换机。direct交换机背后的路由算法很简单——消息会被路由到binding key与消息的routing key完全匹配的队列。

2.3 多绑定
将多个队列绑定到同一个绑定键是完全合法的。在我们的示例中,我们可以在X和Q1之间添加一个绑定键为black的绑定。在这种情况下,direct交换机会表现得像fanout一样,将消息广播给所有匹配的队列。带有路由键black的消息将被传递给Q1和Q2。

2.4 发送日志
我们将为我们的日志系统使用这个模型。我们将消息发送到一个direct交换机,并提供日志严重性作为routing key。这样,接收脚本将能够选择它想要接收的严重性。
首先,我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
然后我们可以发送消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);
为了简化,我们假设’severity’可以是info、warning或error中的一个。
2.5 订阅
接收消息将与上一个教程中的工作方式相同,唯一的区别是我们将为每个我们感兴趣的严重性创建一个新的绑定。
var queueName = channel.QueueDeclare().QueueName;foreach(var severity in args)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}
三、整合代码
3.1 EmitLogDirectApp.cs
using RabbitMQ.Client;
using System.Text;// 从外部传递循环次数,例如10或20
int loopCount = 10; // 可以根据需要修改循环次数
await SendLogsAsync(loopCount);// 等待用户按下回车键退出程序
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();//发送日志消息到RabbitMQ的direct类型的交换机
async Task SendLogsAsync(int loopCount)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机string ExchangeName = "direct_logs";await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 定义可能的严重性级别string[] severities = { "info", "warning", "error" };// 创建Random实例用于生成随机数Random random = new Random();// 循环发送指定次数的消息for (int i = 0; i < loopCount; i++){// 随机选择一个严重性级别string severity = severities[random.Next(severities.Length)];// 构建消息内容,包含循环次数string message = $"Iteration {i + 1} - Hello World!";// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到交换机,使用严重性级别作为路由键await channel.BasicPublishAsync(exchange: ExchangeName, routingKey: severity, body: body);// 打印消息发送成功的信息Console.WriteLine($" [x] Sent '{severity}':'{message}'");}
}
3.2 ReceiveLogsDirectApp.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机
string ExchangeName = "direct_logs";
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 声明一个由服务器命名的队列
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 设定感兴趣的日志级别
//string[] severities = { "info","warning", "error" };
//string[] severities = { "warning", "error" };
string[] severities = { "error" };// 为每个感兴趣的日志级别创建绑定
foreach (string severity in severities)
{await channel.QueueBindAsync(queue: queueName, exchange: ExchangeName, routingKey: severity);
}Console.WriteLine(" [*] Waiting for messages."); // 提示信息,表示消费者正在等待消息// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey; // 获取路由键,即消息的严重性级别Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); // 打印接收到的消息return Task.CompletedTask; // 返回Task.CompletedTask以满足异步事件处理的签名要求
};// 开始消费指定队列的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit."); // 提示信息,等待用户按下回车键退出程序
Console.ReadLine();

3.3 推送所有和接收error、warning级别日志
配置EmitLogDirectApp.cs推送所有日志级别数据。
配置ReceiveLogsDirectApp接收error和warning级别数据。

3.4 推送和接收error级别日志
配置EmitLogDirectApp.cs推送所有日志级别数据。
配置ReceiveLogsDirectApp接收error级别数据。

四、结论
在本教程中,我们深入探讨了RabbitMQ路由模式的概念和实现。通过构建一个可以根据消息属性(如严重性级别)路由消息的日志系统,我们学习了如何创建direct类型的交换机,以及如何根据路由键将消息发送到特定的队列。以下是我们从本教程中获得的关键要点:
-
路由灵活性:通过使用
direct交换机,我们实现了基于路由键的消息路由,这允许我们灵活地控制消息的流向,而不是简单地广播给所有订阅者。 -
消息过滤:我们可以根据消息的属性(如严重性级别)来过滤消息,确保只有相关的消费者接收到消息,从而节省资源并提高效率。
-
绑定键与路由键:我们学习了如何使用绑定键和路由键来控制消息的路由,使得消息可以根据特定的键值被路由到对应的队列。
-
多绑定:我们了解了如何将多个队列绑定到同一个绑定键,从而实现消息的多播分发。
-
实际应用:通过这个教程,我们掌握了如何在实际应用中实现消息的精细化控制,这对于构建复杂的事件驱动架构和微服务架构至关重要。
通过这些机制,我们能够建立一个既高效又灵活的路由系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。
相关文章:
RabbitMQ教程:路由(Routing)(四)
文章目录 RabbitMQ教程:路由(Routing)(四)一、引言二、基本概念2.1 路由与绑定2.2 Direct交换机2.3 多绑定2.4 发送日志2.5 订阅 三、整合代码3.1 EmitLogDirectApp.cs3.2 ReceiveLogsDirectApp.cs3.3 推送所有和接收e…...
华为Ensp模拟器配置RIP路由协议
目录 RIP路由详解:另一种视角解读 1. RIP简介:轻松理解基础概念 2. RIP的核心机制:距离向量的魅力 3. RIP的实用与局限 RIP配置实验 实验图 编辑 PC的ip配置 RIP配置步骤 测试 结语:RIP的今天与明天 RIP路由详解&…...
3. langgraph中的react agent使用 (在react agent添加系统提示)
环境准备 确保你已经安装了以下库: langchainlangchain_openailanggraph 你可以使用以下命令进行安装: pip install langchain langchain_openai langgraph代码实现 1. 初始化模型 首先,我们需要初始化智谱AI的聊天模型。 from langch…...
(02)ES6教程——Map、Set、Reflect、Proxy、字符串、数值、对象、数组、函数
目录 前言 一、Map Maps 和 Objects 的区别 Map的迭代 forEach() Map对象的操作 二、Set Set 中的特殊值 三、Reflect 四、Proxy 五、字符串 六、数值 七、对象 八、数组 九、函数 参考文献 前言 一、Map Map 对象保存键值对。任何值(对象或者原始值) 都可以…...
【快速解决】kafka崩了,重启之后,想继续消费,怎么做?
目录 一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 1、一个问题: 2、查看消费者消费主题__consumer_offsets 3、一个重要前提:消费时要提交offset 二、指定 Offset 消费 假如遇到kafka崩了,你重启kafka之后࿰…...
C++ 的发展
目录 C 的发展总结:编辑 1. C 的早期发展(1979-1985) 2. C 标准化过程(1985-1998) 3. C 标准演化(2003-2011) 4. C11(2011年) 5. C14(2014年…...
RabbitMQ 高级特性——延迟队列
文章目录 前言延迟队列延迟队列的概念TTL 死信队列模拟延迟队列设置队列的 TTL设置消息的 TTL 延迟队列插件安装并且启动插件服务使用插件实现延迟功能 前言 前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交…...
EAC(Estimate at Completion)和ETC(Estimate to Complete)
EAC 预计完工成本ETC 预计尚需成本Estimate at CompletionEstimate to Complete完成预估完工时尚需成本估算 EAC ETC ACETC EAC – AC 预测项目总成本,包含了到目前为止实际发生的成本(AC)和预计将发生的成本。如果EAC大于BAC…...
【React】状态管理之Zustand
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 状态管理之Zustand引言1. Zustand 的核心特点1.1 简单直观的 API1.2 无需 Provi…...
Vue3打包自动生成版本JSON文件,添加系统版本检查,实现系统自动更新提示
实现该功能一共有三步。废话不多说,直接上代码!!! 第一步:打包时自动生成版本信息的js文件,versionUpdate.js import fs from fs; import path from path; import { ElMessageBox } from element-plus; i…...
海量数据有限内存系列问题解决方案
1. 排序问题 有限数据充足内存:内存中有十万整数,对所有数据进行排序。 内部排序即可 单节点海量数据有限内存:某台机器有一个文件,文件中包含六十亿整数,一个整数一行,可用内存1G,对所有数据…...
FFmpeg 4.3 音视频-多路H265监控录放C++开发十四,总结编码过程,从摄像头获得数据后,转成AVFrame,然后再次转成AVPacket,
也就是将摄像头采集到的YUV 的数据换成 AVFrame,然后再次转成 AVPacket,那么这AVPakcet数据要怎么办呢?分为三种情况: 一种是将AVPacket存储成h264文件,由于h264编码器在将avframe变成avpacket的时候就是按照h264的格…...
内容占位符:Kinetic Loader HTML+CSS 使用CSS制作三角形原理
内容占位符 前言 随着我们对HTML和CSS3的学习逐渐深入,相信大家都已经掌握了网页制作的基础知识,包括如何使用HTML标记构建网页结构,以及如何运用CSS样式美化页面。为了进一步巩固和熟练这些技能,今天我们一起来完成一个有趣且实…...
麒麟nginx配置
一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容,写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …...
如何在 Ubuntu 上安装 Emby 媒体服务器
Emby 是一个开源的媒体服务器解决方案,它能让你整理、流媒体播放和分享你的个人媒体收藏,包括电影、音乐、电视节目和照片。Emby 帮你集中多媒体内容,让你无论在家还是在外都能轻松访问。它还支持转码,让你能够播放各种格式的内容…...
Mac上详细配置java开发环境和软件(更新中)
文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan.baidu.com/s/17…...
jmeter常用配置元件介绍总结之定时器
系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…...
Spring——提前编译
提前编译:AOT AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式,这两种编译方式的主要区别在于是否在“运行时”进行编译 (1)JIT, Just-in-time,动态(即时)编译,边运行边编译࿱…...
乐理的学习(音程)
二度,三度,六度,七度的大n度都是直接的音名到音名,如#A到#G的,这样为大n度 而这个基础上向内收,收半音为小n度,在小n度再收,为减n度 在大n度的基础上再向外扩半音,为增…...
【网络】数据链路层协议——以太网,ARP协议
> 作者:დ旧言~ > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:了解什么是以太网协议和ARP协议。 > 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安! > 专栏选自…...
G-Helper:让华硕笔记本性能释放的轻量级硬件控制工具
G-Helper:让华硕笔记本性能释放的轻量级硬件控制工具 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops. Control tool for ROG Zephyrus G14, G15, G16, M16, Flow X13, Flow X16, TUF, Strix, Scar and other models 项目地址…...
Display Driver Uninstaller完全指南:解决显卡驱动残留的系统级清理方案
Display Driver Uninstaller完全指南:解决显卡驱动残留的系统级清理方案 【免费下载链接】display-drivers-uninstaller Display Driver Uninstaller (DDU) a driver removal utility / cleaner utility 项目地址: https://gitcode.com/gh_mirrors/di/display-dri…...
别再让地图‘飘’了!深入浅出解析Cesium中GCJ-02、BD-09坐标偏移原理与DVGIS库实战
解密国内地图坐标系:从原理到实战解决Cesium中的“飘移”问题 你是否曾在Cesium中加载不同来源的地图数据时,发现明明标注的是同一个位置,却出现了明显的偏移?这种“飘移”现象背后,隐藏着国内地图坐标系复杂的加密体系…...
工业数据采集避坑指南:Java+Utgard实现OPC DA高可靠通信的3个关键技巧
工业数据采集避坑指南:JavaUtgard实现OPC DA高可靠通信的3个关键技巧 在工业自动化领域,OPC DA(OLE for Process Control Data Access)协议作为连接工业设备和信息系统的桥梁,其稳定性直接关系到生产数据的完整性和实时…...
C++ STL 容器线程安全的边界条件
C STL容器线程安全的边界条件探析 在多线程编程中,C标准模板库(STL)容器的高效使用一直是开发者关注的焦点。尽管STL容器在设计上并未原生支持线程安全,但其性能优势使得开发者仍需在并发环境中谨慎使用。理解STL容器线程安全的边…...
Rustup工具链管理深度解析:多版本Rust环境实战指南
Rustup工具链管理深度解析:多版本Rust环境实战指南 【免费下载链接】rustup The Rust toolchain installer 项目地址: https://gitcode.com/gh_mirrors/ru/rustup Rustup作为Rust语言的官方工具链管理器,为开发者提供了稳定、测试版和夜间版多版本…...
cv_unet_image-colorization效果展示:看AI如何为历史照片智能上色
cv_unet_image-colorization效果展示:看AI如何为历史照片智能上色 1. 引言:让历史重现色彩的魅力 黑白照片承载着珍贵的记忆,但缺乏色彩总让人感觉少了些什么。想象一下,如果能将祖辈的老照片恢复成彩色,看到他们当年…...
MCU内存管理实战:用__attribute__控制变量在Flash/RAM中的存放位置
MCU内存管理实战:用__attribute__控制变量在Flash/RAM中的存放位置 引言:嵌入式开发中的内存困局 在Cortex-M系列MCU开发中,我们常常面临这样的矛盾:一方面,片上Flash和RAM资源极其有限(尤其是成本敏感型产…...
Element UI表格进阶:手把手教你自定义el-table展开按钮样式与排序功能
Element UI表格深度定制:从展开按钮到排序逻辑的全方位改造指南 在企业级前端开发中,数据表格的交互体验直接影响用户操作效率。Element UI的el-table组件虽然提供了开箱即用的功能,但面对复杂业务场景时,默认配置往往难以满足个性…...
多核系统RingBuff通信机制与实现原理
多核系统RingBuff通信机制深度解析1. 核间通信基础架构1.1 共享内存通信原理在多核处理器系统中,主核与从核之间的通信通常采用共享内存机制。这种设计通过以下核心组件实现:共享内存区域:预先分配的可被多个核访问的物理内存空间核间中断&am…...
