RabbitMQ教程:路由(Routing)(四)
文章目录
- RabbitMQ教程:路由(Routing)(四)
- 一、引言
- 二、基本概念
- 2.1 路由与绑定
- 2.2 Direct交换机
- 2.3 多绑定
- 2.4 发送日志
- 2.5 订阅
- 三、整合代码
- 3.1 EmitLogDirectApp.cs
- 3.2 ReceiveLogsDirectApp.cs
- 3.3 推送所有和接收error、warning级别日志
- 3.4 推送和接收error级别日志
- 四、结论
RabbitMQ教程:路由(Routing)(四)
一、引言
在之前的教程中,我们构建了一个简单的日志系统,该系统能够将日志消息广播给多个接收者。在本教程中,我们将扩展这个系统,增加一个功能:只订阅消息的一个子集。例如,我们可能只想将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
二、基本概念
2.1 路由与绑定
在之前的示例中,我们已经创建了绑定。绑定是交换机和队列之间的关系,可以简单地理解为:队列对来自这个交换机的消息感兴趣。
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);
绑定可以带一个额外的routingKey
参数,为了避免与BasicPublish
参数混淆,我们将其称为binding key
。下面是如何创建带有键的绑定:
channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");
绑定键的含义取决于交换机类型。我们之前使用的fanout
交换机,简单地忽略了它的值。
2.2 Direct交换机
我们的日志系统从上一个教程中广播所有消息给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望写入磁盘日志消息的脚本只接收关键错误,而不是浪费磁盘空间在警告或信息日志消息上。
我们之前使用的fanout
交换机没有给我们提供太多灵活性——它只能进行无脑广播。
我们将改用direct
交换机。direct
交换机背后的路由算法很简单——消息会被路由到binding key
与消息的routing key
完全匹配的队列。
2.3 多绑定
将多个队列绑定到同一个绑定键是完全合法的。在我们的示例中,我们可以在X
和Q1
之间添加一个绑定键为black
的绑定。在这种情况下,direct
交换机会表现得像fanout
一样,将消息广播给所有匹配的队列。带有路由键black
的消息将被传递给Q1
和Q2
。
2.4 发送日志
我们将为我们的日志系统使用这个模型。我们将消息发送到一个direct
交换机,并提供日志严重性作为routing key
。这样,接收脚本将能够选择它想要接收的严重性。
首先,我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
然后我们可以发送消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);
为了简化,我们假设’severity’可以是info
、warning
或error
中的一个。
2.5 订阅
接收消息将与上一个教程中的工作方式相同,唯一的区别是我们将为每个我们感兴趣的严重性创建一个新的绑定。
var queueName = channel.QueueDeclare().QueueName;foreach(var severity in args)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}
三、整合代码
3.1 EmitLogDirectApp.cs
using RabbitMQ.Client;
using System.Text;// 从外部传递循环次数,例如10或20
int loopCount = 10; // 可以根据需要修改循环次数
await SendLogsAsync(loopCount);// 等待用户按下回车键退出程序
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();//发送日志消息到RabbitMQ的direct类型的交换机
async Task SendLogsAsync(int loopCount)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机string ExchangeName = "direct_logs";await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 定义可能的严重性级别string[] severities = { "info", "warning", "error" };// 创建Random实例用于生成随机数Random random = new Random();// 循环发送指定次数的消息for (int i = 0; i < loopCount; i++){// 随机选择一个严重性级别string severity = severities[random.Next(severities.Length)];// 构建消息内容,包含循环次数string message = $"Iteration {i + 1} - Hello World!";// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 异步发布消息到交换机,使用严重性级别作为路由键await channel.BasicPublishAsync(exchange: ExchangeName, routingKey: severity, body: body);// 打印消息发送成功的信息Console.WriteLine($" [x] Sent '{severity}':'{message}'");}
}
3.2 ReceiveLogsDirectApp.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();// 声明名为"direct_logs"的direct类型的交换机
string ExchangeName = "direct_logs";
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);// 声明一个由服务器命名的队列
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;// 设定感兴趣的日志级别
//string[] severities = { "info","warning", "error" };
//string[] severities = { "warning", "error" };
string[] severities = { "error" };// 为每个感兴趣的日志级别创建绑定
foreach (string severity in severities)
{await channel.QueueBindAsync(queue: queueName, exchange: ExchangeName, routingKey: severity);
}Console.WriteLine(" [*] Waiting for messages."); // 提示信息,表示消费者正在等待消息// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{// 从接收到的消息中提取消息体并转换为字符串var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey; // 获取路由键,即消息的严重性级别Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); // 打印接收到的消息return Task.CompletedTask; // 返回Task.CompletedTask以满足异步事件处理的签名要求
};// 开始消费指定队列的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit."); // 提示信息,等待用户按下回车键退出程序
Console.ReadLine();
3.3 推送所有和接收error、warning级别日志
配置EmitLogDirectApp.cs
推送所有日志级别数据。
配置ReceiveLogsDirectApp
接收error和warning级别数据。
3.4 推送和接收error级别日志
配置EmitLogDirectApp.cs
推送所有日志级别数据。
配置ReceiveLogsDirectApp
接收error级别数据。
四、结论
在本教程中,我们深入探讨了RabbitMQ路由模式的概念和实现。通过构建一个可以根据消息属性(如严重性级别)路由消息的日志系统,我们学习了如何创建direct
类型的交换机,以及如何根据路由键将消息发送到特定的队列。以下是我们从本教程中获得的关键要点:
-
路由灵活性:通过使用
direct
交换机,我们实现了基于路由键的消息路由,这允许我们灵活地控制消息的流向,而不是简单地广播给所有订阅者。 -
消息过滤:我们可以根据消息的属性(如严重性级别)来过滤消息,确保只有相关的消费者接收到消息,从而节省资源并提高效率。
-
绑定键与路由键:我们学习了如何使用绑定键和路由键来控制消息的路由,使得消息可以根据特定的键值被路由到对应的队列。
-
多绑定:我们了解了如何将多个队列绑定到同一个绑定键,从而实现消息的多播分发。
-
实际应用:通过这个教程,我们掌握了如何在实际应用中实现消息的精细化控制,这对于构建复杂的事件驱动架构和微服务架构至关重要。
通过这些机制,我们能够建立一个既高效又灵活的路由系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。
相关文章:

RabbitMQ教程:路由(Routing)(四)
文章目录 RabbitMQ教程:路由(Routing)(四)一、引言二、基本概念2.1 路由与绑定2.2 Direct交换机2.3 多绑定2.4 发送日志2.5 订阅 三、整合代码3.1 EmitLogDirectApp.cs3.2 ReceiveLogsDirectApp.cs3.3 推送所有和接收e…...

华为Ensp模拟器配置RIP路由协议
目录 RIP路由详解:另一种视角解读 1. RIP简介:轻松理解基础概念 2. RIP的核心机制:距离向量的魅力 3. RIP的实用与局限 RIP配置实验 实验图 编辑 PC的ip配置 RIP配置步骤 测试 结语:RIP的今天与明天 RIP路由详解&…...

3. langgraph中的react agent使用 (在react agent添加系统提示)
环境准备 确保你已经安装了以下库: langchainlangchain_openailanggraph 你可以使用以下命令进行安装: pip install langchain langchain_openai langgraph代码实现 1. 初始化模型 首先,我们需要初始化智谱AI的聊天模型。 from langch…...

(02)ES6教程——Map、Set、Reflect、Proxy、字符串、数值、对象、数组、函数
目录 前言 一、Map Maps 和 Objects 的区别 Map的迭代 forEach() Map对象的操作 二、Set Set 中的特殊值 三、Reflect 四、Proxy 五、字符串 六、数值 七、对象 八、数组 九、函数 参考文献 前言 一、Map Map 对象保存键值对。任何值(对象或者原始值) 都可以…...

【快速解决】kafka崩了,重启之后,想继续消费,怎么做?
目录 一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 1、一个问题: 2、查看消费者消费主题__consumer_offsets 3、一个重要前提:消费时要提交offset 二、指定 Offset 消费 假如遇到kafka崩了,你重启kafka之后࿰…...

C++ 的发展
目录 C 的发展总结:编辑 1. C 的早期发展(1979-1985) 2. C 标准化过程(1985-1998) 3. C 标准演化(2003-2011) 4. C11(2011年) 5. C14(2014年…...

RabbitMQ 高级特性——延迟队列
文章目录 前言延迟队列延迟队列的概念TTL 死信队列模拟延迟队列设置队列的 TTL设置消息的 TTL 延迟队列插件安装并且启动插件服务使用插件实现延迟功能 前言 前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交…...

EAC(Estimate at Completion)和ETC(Estimate to Complete)
EAC 预计完工成本ETC 预计尚需成本Estimate at CompletionEstimate to Complete完成预估完工时尚需成本估算 EAC ETC ACETC EAC – AC 预测项目总成本,包含了到目前为止实际发生的成本(AC)和预计将发生的成本。如果EAC大于BAC…...

【React】状态管理之Zustand
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 状态管理之Zustand引言1. Zustand 的核心特点1.1 简单直观的 API1.2 无需 Provi…...

Vue3打包自动生成版本JSON文件,添加系统版本检查,实现系统自动更新提示
实现该功能一共有三步。废话不多说,直接上代码!!! 第一步:打包时自动生成版本信息的js文件,versionUpdate.js import fs from fs; import path from path; import { ElMessageBox } from element-plus; i…...

海量数据有限内存系列问题解决方案
1. 排序问题 有限数据充足内存:内存中有十万整数,对所有数据进行排序。 内部排序即可 单节点海量数据有限内存:某台机器有一个文件,文件中包含六十亿整数,一个整数一行,可用内存1G,对所有数据…...

FFmpeg 4.3 音视频-多路H265监控录放C++开发十四,总结编码过程,从摄像头获得数据后,转成AVFrame,然后再次转成AVPacket,
也就是将摄像头采集到的YUV 的数据换成 AVFrame,然后再次转成 AVPacket,那么这AVPakcet数据要怎么办呢?分为三种情况: 一种是将AVPacket存储成h264文件,由于h264编码器在将avframe变成avpacket的时候就是按照h264的格…...

内容占位符:Kinetic Loader HTML+CSS 使用CSS制作三角形原理
内容占位符 前言 随着我们对HTML和CSS3的学习逐渐深入,相信大家都已经掌握了网页制作的基础知识,包括如何使用HTML标记构建网页结构,以及如何运用CSS样式美化页面。为了进一步巩固和熟练这些技能,今天我们一起来完成一个有趣且实…...

麒麟nginx配置
一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容,写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …...

如何在 Ubuntu 上安装 Emby 媒体服务器
Emby 是一个开源的媒体服务器解决方案,它能让你整理、流媒体播放和分享你的个人媒体收藏,包括电影、音乐、电视节目和照片。Emby 帮你集中多媒体内容,让你无论在家还是在外都能轻松访问。它还支持转码,让你能够播放各种格式的内容…...

Mac上详细配置java开发环境和软件(更新中)
文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan.baidu.com/s/17…...

jmeter常用配置元件介绍总结之定时器
系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…...

Spring——提前编译
提前编译:AOT AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式,这两种编译方式的主要区别在于是否在“运行时”进行编译 (1)JIT, Just-in-time,动态(即时)编译,边运行边编译࿱…...

乐理的学习(音程)
二度,三度,六度,七度的大n度都是直接的音名到音名,如#A到#G的,这样为大n度 而这个基础上向内收,收半音为小n度,在小n度再收,为减n度 在大n度的基础上再向外扩半音,为增…...

【网络】数据链路层协议——以太网,ARP协议
> 作者:დ旧言~ > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:了解什么是以太网协议和ARP协议。 > 毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安! > 专栏选自…...

Linux分区、挂载、配额、逻辑卷、RAID、系统综合状态查看
分区与挂载 fdisk fdisk 命令是一个用于磁盘分区管理的命令行工具,可以用来创建、删除、调整分区等操作。常用的 fdisk 命令选项包括: fdisk -l:列出系统中的所有磁盘分区信息。 fdisk /dev/sdX:打开指定磁盘进行分区操作。 n&…...

3D Gaussian Splatting 代码层理解之Part1
2023 年初,来自蔚蓝海岸大学和 马克斯普朗克学会的作者发表了一篇题为“用于实时现场渲染的 3D 高斯泼溅”的论文。该论文提出了实时神经渲染的重大进步,超越了NeRF等以前方法的实用性。高斯泼溅不仅减少了延迟,而且达到或超过了 NeRF 的渲染质量,在神经渲染领域掀起了一场…...

Qt小知识-Q_GLOBAL_STATIC
你还在为创建全局静态对象烦恼嘛,它来了!它来了! qt5提供了两个宏定义Q_GLOBAL_STATIC和Q_GLOBAL_STATIC_WITH_ARGS来实现。可以创建一个全局静态对象,对象在第一次使用时初始化自身,这意味着它不会增加应用程序或库的…...

【SpringBoot】使用过滤器进行XSS防御
在Spring Boot中,我们可以使用注解的方式来进行XSS防御。注解是一种轻量级的防御手段,它可以在方法或字段级别对输入进行校验,从而防止XSS攻击。 而想对全局的请求都进行XSS防御可以使用servlet中的过滤器或者spring mvc中的拦截器ÿ…...

创建vue插件,发布npm
开发步骤:1.创建一个vue项目,2.开发一个组件。 3.注册成插件。 4.vite和package.json配置。5.发布到npm 1.创建一个vue项目 npm create vuelatest 生成了vue项目之后,得到了以下结构。 在src下创建个plugins目录。用于存放开发的…...

【Android Compose原创组件】可拖动滚动条的完美实现
项目背景 我在使用安卓Compose开发自己的【JK管理器】的过程中,很多地方都需要使用滚动条,在Github上也有实现的比较好,但是大多都是基于View(我要的是Compose啊)。 在研究Android 官方示例项目 nowinandroid 中&…...

【模块一】kubernetes容器编排进阶实战之资源管理核心概念
kubernetes 资源管理核心概念 k8s的设计理念—分层架构 CRI-container runtime interface-容器运行接口 CNI-container network interface-容器网络接口 CSI-container storage interface-容器存储接口 k8s的设计理念—API设计原则 https://www.kubernetes.org.cn/kubernete…...

用Python设置PowerPoint幻灯片背景
使用Python自动化处理Office文档,如PowerPoint演示文稿,是提高效率和创造力的重要手段。设置PowerPoint幻灯片背景不仅能够增强演示文稿的视觉吸引力,还能帮助传达特定的情感或信息,使观众更加投入。通过编程方式批量修改幻灯片背…...

Restful API接⼝简介及为什么要进⾏接⼝压测
一、RESTful API简介 在现代Web开发中,RESTful API已经成为一种标准的设计模式,用于构建和交互网络应用程序。本文将详细介绍RESTful API的基本概念、特点以及如何使用它来设计高效的API接口。 1. 基于协议 HTTP 或 HTTPS RESTful API通常使用HTTP&am…...

[pyspark] pyspark中如何修改列名字
使用 .withColumnRenamed 来重命名,直接看demo: from pyspark.sql import SparkSessionspark SparkSession.builder.appName("example").getOrCreate()data [("Alice", 1, 200),("Bob", 2, 300),("Charlie",…...