当前位置: 首页 > news >正文

【RabbitMQ】golang客户端教程4——路由(使用direct交换器)

路由

在上一教程中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。

在本教程中,我们将向它添加一个特性-我们将使它能够只订阅消息的一个子集。例如,我们将只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的示例中,我们已经在创建绑定。你可能会想起以下代码:

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil)

绑定是交换器和队列之间的关系。这可以简单地理解为:队列对来自此交换器的消息感兴趣。

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

绑定密钥的含义取决于交换器的类型。我们以前使用的fanout交换器只是忽略了这个值。

直连交换器

我们上一个教程中的日志系统将所有消息广播给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本只接收严重错误,而不会在warning或info日志消息上浪费磁盘空间。

我们使用fanout交换器,这并没有给我们很大的灵活性——它只能进行无脑广播。

我们将使用direct交换器。direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

为了说明这一点,请考虑以下设置:
在这里插入图片描述
在此设置中,我们可以看到绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

多重绑定

在这里插入图片描述
用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

发送日志

我们将在日志系统中使用这个模型。我们将发送消息到direct交换器,而不是fanout。我们将提供严重性(译注:通常我们使用日志级别划分日志信息的严重性)作为路由键。这样,接收脚本将能够选择其想要接收的日志级别。让我们首先关注发送日志。

与往常一样,我们需要首先创建一个交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

我们已经准备好发送一条消息:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)
failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”“warning”“error”之一。

订阅

接收消息的工作方式与上一教程一样,但有一个例外——我们将为感兴趣的每种严重性(日志级别)创建一个新的绑定。

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

完整示例

在这里插入图片描述
emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

源自:https://www.rabbitmq.com/getstarted.html

相关文章:

【RabbitMQ】golang客户端教程4——路由(使用direct交换器)

路由 在上一教程中&#xff0c;我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。 在本教程中&#xff0c;我们将向它添加一个特性-我们将使它能够只订阅消息的一个子集。例如&#xff0c;我们将只能将关键错误消息定向到日志文件&#xff08;以节省磁盘…...

Shell脚本学习-for循环结构2

案例&#xff1a;通过脚本实现仅sshd、rsyslog、crond、network、sysstat服务在开机时自启动。 Linux系统在开机的服务通常工作在文本模式3级别&#xff0c;因此只需要查找3级别以上的开启的服务即可。查看命令&#xff1a; chkconfig --list |grep 3:on [rootvm1 ~]# chkco…...

vue 老项目 npm install 报错Python,c++等相关错误

​​​ 老项目npm install 下载依赖包报错 解决方法&#xff1a; //下载python 1、 npm install --global --production windows-build-tools//配置环境 &#xff1a; 也可暂时不用配置,能用就不用配置&#xff08;npm config set python "D:\Python27\python.exe&q…...

【c语言初级】c++基础

文章目录 1. C关键字2. 命名空间2.1 命名空间定义2.2 命名空间使用 3. C输入&输出4. 缺省参数4.1 缺省参数概念4.2 缺省参数分类 5. 函数重载5.2 C函数重载的原理--名字修饰采用C语言编译器编译后结果 1. C关键字 C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想…...

idea打开传统eclipse项目

打开传统web项目 1.打开后选择项目文件 2.选择项目结构 3.设置jdk版本 4.导入当前项目模块 5.选择eclipse 6. 设置保存目录 7.右键模块&#xff0c;添加spring和web文件 8. 设置web目录之类的&#xff0c;并且创建打包工具 9.如果有本地lib&#xff0c;添加为库 最后点击应用&…...

全国各城市-财政收入-一般预算收入-各项税收-个人所得税(1999-2020年)

个人所得税是一项反映国家财政状况和个人经济水平的重要数据。通过对全国各城市个人所得税数据的研究&#xff0c;可以提供研究者参考的有益信息。首先&#xff0c;个人所得税数据反映了不同城市居民的收入水平。通过对不同城市的个人所得税数据进行比较&#xff0c;可以了解不…...

【动态网页抓取】 :用Python抓取所有内容的指南

一、说明 您在抓取动态网页内容时是否得到了糟糕的结果&#xff1f;不仅仅是你。对于标准抓取工具来说&#xff0c;爬网动态数据是一项具有挑战性的任务&#xff08;至少可以说&#xff09;。这是因为当发出HTTP请求时&#xff0c;响应程序的某些部分JavaScript在后台运行&…...

Spring Boot数据访问基础知识与JDBC简单实现

目录 Spring Boot数据访问基础知识 Spring Data ORM JDBC JPA JDBC简单实现 步骤1&#xff1a;新建Maven项目&#xff0c;添加依赖 步骤2&#xff1a;配置数据源—让程序可以访问到 步骤3&#xff1a;配置数据源—让IDEA可以访问到 步骤4&#xff1a;添加数据库和表 …...

ubuntu添加万能头文件

ubuntu的C头文件目录为/usr/include 在/usr/include下新建文件夹 bits sudo mkdir bits进入bits&#xff0c;新建stdc.h&#xff0c;并修改权限为744/777 cd bits;sudo touch stdc.h;sudo chmod 777 stdc.h将以下内容粘贴到stdc.h&#xff0c;保存退出 // C includes used …...

聊一聊关于前端语法 ?? 的那些事

当我们在编写前端代码时&#xff0c;语法是非常重要的。正确的语法可以确保我们的代码能够正常运行&#xff0c;并且易于维护和理解。在本文中&#xff0c;我们将探讨一些前端语法的问题&#xff0c;例如空值合并运算符&#xff08;??&#xff09;。 空值合并运算符是ES2020…...

宝塔Linux面板升级“获取更新包失败”怎么解决?

宝塔Linux面板执行升级命令后失败&#xff0c;提示“获取更新包失败&#xff0c;请稍后更新或联系宝塔运维”如何解决&#xff1f;新手站长分享宝塔面板升级失败的解决方法&#xff1a; 宝塔面板升级失败解决方法 1、使用root账户登录到你的云服务器上&#xff0c;宝塔Linux面…...

训练强化学习的经验回放策略:experience replay

经验回放&#xff1a;Experience Replay&#xff08;训练DQN的一种策略&#xff09; 优点&#xff1a;可以重复利用离线经验数据&#xff1b;连续的经验具有相关性&#xff0c;经验回放可以在离线经验BUFFER随机抽样&#xff0c;减少相关性&#xff1b; 超参数&#xff1a;Rep…...

uniapp学习

1 简单的表单校验 <!--uniapp:参考模板和字段生成页面 字段stuNumber 输入框 学号stuName 输入框 学生姓名teacher 输入框 辅导员submitDate 日期选择 填报日期morningTemperature 输入框&#xff08;数字校验一位小数&#xff09; 早上体温noonTemperature 输入框&…...

机器学习深度学习——数值稳定性和模型化参数(详细数学推导)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——Dropout &#x1f4da;订阅专栏&#xff1a;机器学习&&深度学习 希望文章对你们有所帮助 这一部…...

layui 整合UEditor 百度编辑器

layui 整合UEditor 百度编辑器 第一步&#xff1a;下载百度编辑器并配置好路径 百度编辑器下载地址&#xff1a;http://fex.baidu.com/ueditor/ 第二步&#xff1a;引入百度编辑器 代码如下&#xff1a; <div class"layui-form-item layui-form-text"><…...

1、sparkStreaming概述

1、sparkStreaming概述 1.1 SparkStreaming是什么 它是一个可扩展&#xff0c;高吞吐具有容错性的流式计算框架 吞吐量&#xff1a;单位时间内成功传输数据的数量 之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务&#xff0c;数据一般都是在固定位置上&…...

【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

文章目录 前言基本概念消息和主题相关发送普通消息 发送顺序消息RocketMQTemplate的API介绍参考资料&#xff1a; 前言 本文主要有以下内容&#xff1a; 简单消息的发送顺序消息的发送RocketMQTemplate的API介绍 环境搭建&#xff1a; RocketMQ的安装教程&#xff1a;在官网…...

uniapp:图片验证码检验问题处理

图形验证码功能实现 uniapp&#xff1a;解决图形验证码问题及利用arraybuffer二进制转base64格式图片&#xff08;后端传的图片数据形式&#xff1a;x00\x10JFIF\x00\x01\x02\x00…&#xff09;_❆VE❆的博客-CSDN博客 UI稿&#xff1a; 需求&#xff1a;向后端请求验证码图片&…...

将Visio和Excel导出成没有白边的PDF文件

1、VISIO如何无白边导出pdf格式 在使用Latex时&#xff0c;要导入矢量图eps格式。但是VISIO无法输出eps格式&#xff0c;这就需要将其导出为pdf。但是导出pdf时&#xff0c;往往会有大量的白边。VISIO无白边导出pdf格式的方法如下&#xff1a; 1.文件——开发工具——显示sha…...

String类及其工具类

一、String类 1.字符串对象 String str new String("hello");String对象是final修饰的&#xff0c;不可修改的&#xff0c;修改后的字符串对象是另外一个对象&#xff0c;只是修改了引用地址。每次创建都会创建一个新的对象。 2. 字面量 String s "hello&…...

终极Python通达信数据读取指南:5分钟快速入门量化分析

终极Python通达信数据读取指南&#xff1a;5分钟快速入门量化分析 【免费下载链接】mootdx 通达信数据读取的一个简便使用封装 项目地址: https://gitcode.com/GitHub_Trending/mo/mootdx 在金融数据分析和量化交易领域&#xff0c;通达信数据读取一直是Python开发者面临…...

基于PyPortal与CircuitPython的桌面空气质量监测站DIY指南

1. 项目概述&#xff1a;打造你的桌面级空气质量监测站如果你和我一样&#xff0c;对身边的空气质量有点“强迫症”&#xff0c;总想知道窗外空气到底怎么样&#xff0c;但又不想总去翻手机App&#xff0c;那么这个项目就是为你量身定做的。我们将利用一块名为PyPortal的开发板…...

轻量级代码生成模型nanocoder:边缘部署与高效微调实战

1. 项目概述&#xff1a;一个为边缘而生的高效代码生成模型最近在折腾一些边缘设备上的AI应用&#xff0c;比如在树莓派或者Jetson Nano上跑一些轻量级的代码补全工具&#xff0c;发现市面上那些动辄几十亿参数的大模型根本塞不进去&#xff0c;跑起来也慢得让人心焦。就在这个…...

新时代的信息茧房

大家有没有发现&#xff1a;信息爆炸 2.0 时代&#xff0c;获取真知为何反而更难了&#xff1f; 人类正身处信息传播最为便捷的时代。移动互联网的普及与信息技术的迭代升级&#xff0c;让知识获取变得前所未有的低廉易得。迈入 AI 时代后&#xff0c;这一发展进程更是被推至全…...

如何免费解锁Cursor AI Pro功能:终极三步激活指南

如何免费解锁Cursor AI Pro功能&#xff1a;终极三步激活指南 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached your trial r…...

ASO技能库构建指南:从基础原理到实战应用

1. 项目概述&#xff1a;ASO技能库的构建与价值最近在和一些做独立开发者和出海应用推广的朋友聊天&#xff0c;大家普遍提到一个痛点&#xff1a;都知道ASO&#xff08;应用商店优化&#xff09;重要&#xff0c;但真到动手优化自家App的时候&#xff0c;却感觉无从下手。市面…...

【NotebookLM学术写作黄金法则】:20年科研老炮亲授5大避坑指南与3步合规提速法

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;NotebookLM学术写作规范的底层逻辑与认知革命 NotebookLM 并非传统意义上的文档编辑器&#xff0c;而是一个以“语义锚点”和“引用可追溯性”为基石的学术协作文本引擎。其底层逻辑颠覆了线性写作范式…...

GD32C10x 标准库 EXTI 驱动源码深度解析

前言 在 GD32C10x 单片机开发中,外部中断 EXTI是实现外设异步响应、按键检测、电平触发等功能的核心外设,几乎所有嵌入式项目都会用到 EXTI。 兆易创新提供的 GD32C10x 标准库中,gd32c10x_exti.c是 EXTI 外设的底层驱动文件,封装了 EXTI 初始化、中断使能、标志位操作、软…...

Netscape 浏览器:互联网时代的先驱者

Netscape 浏览器:互联网时代的先驱者 引言 自互联网诞生以来,浏览器作为连接用户与网络世界的重要工具,见证了互联网的飞速发展。在众多浏览器中,Netscape 浏览器以其创新和引领潮流的特性,成为了互联网时代的先驱者。本文将回顾 Netscape 浏览器的发展历程、技术特点及…...

MIMO AONN架构:量子干涉实现超低功耗光学神经网络

1. MIMO AONN架构的核心价值光学神经网络&#xff08;AONN&#xff09;正在突破传统电子计算的物理极限。在传统电子神经网络中&#xff0c;非线性激活函数需要消耗大量能量进行电子-光子转换&#xff0c;而基于量子干涉的光学非线性机制可以直接在光域实现这一关键操作。我们实…...