当前位置: 首页 > news >正文

go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.绑定

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

 3.直连交换器

direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

direct-exchang

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

4.多重绑定

direct-exchange-multiple

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

5.发送日志

在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。

先创建一个direct交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

指定routing key发送一条消息:

body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。

6.订阅

为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

7.完整示例

img

emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)

参考文章:

go rabbitmq Routing模式 - 范斯猫 (fansimao.com)

RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)

 

相关文章:

go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅 在Fanout模式中&#xff0c;一条消息&#xff0c;会被所有订阅的队列都消费。但是&#xff0c;在某些场景下&#xff0c;我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下&#xff1a; 队列与交换机的绑定&#xff0c;不能…...

PyTorch 2.2 中文官方教程(十八)

开始使用完全分片数据并行&#xff08;FSDP&#xff09; 原文&#xff1a;pytorch.org/tutorials/intermediate/FSDP_tutorial.html 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Hamid Shojanazeri&#xff0c;Yanli Zhao&#xff0c;Shen Li 注意…...

jenkins部署vue项目

首次加载比较慢、需要等待很长时间 到这个页面算是初始化完成了 输入密码路径为 之前设置的路径 可以在文件中找或者 docker logs jenkins 直接安装推荐插件 正在安装中&#xff01;&#xff01; 安装成功后创建管理员账号(一定要记住这个也是登录账号密码) 这里实例配置直接…...

十一、C++核心编程(2)引用

一、引用的基本使用 作用: 给变量起别名语法: 数据类型 &别名 原名 #include<iostream> #include<string.h> using namespace std;int main() {//引用基本语法//数据类型 &别名 原名int a 10;//创建引用int &b a;cout << "a "…...

numpy学习总结二

单词发音&#xff1a; squeeze 发音&#xff1a;死贵子 concatenation [kɒnˌktəˈneɪʃən] 拼接;串联 threshold [θreʃhəʊld] 死re后的 quantile 拷n太哦 分位数 因果不能改 智慧不能赐 正法不可说 无缘不能度 天雨虽宽不润无根之草&#xff1b;佛法虽广不度无缘之人 …...

3 编辑器(Vim)

1.完成 vimtutor。备注&#xff1a;它在一个 80x24&#xff08;80 列&#xff0c;24 行&#xff09; 终端窗口看起来效果最好。 2.下载我们提供的 vimrc&#xff0c;然后把它保存到 ~/.vimrc。 通读这个注释详细的文件 &#xff08;用 Vim!&#xff09;&#xff0c; 然后观察 …...

C/C++ (stdio.h)标准库详解

cstdio,在C语言中称为stdio.h。该库使用所谓的流与物理设备&#xff08;如键盘、打印机、终端&#xff09;或系统支持的任何其他类型的文件一起操作。 在本文将会通过介绍函数参数&#xff0c;举出实际的简单例子来帮助大家快速上手使用函数。 目录 一、流 二、库函数 1、F…...

深度学习介绍

对于具备完善业务逻辑的任务&#xff0c;大多数情况下&#xff0c;正常的人都可以给出一个符合业务逻辑的应用程序。但是对于一些包含超过人类所能考虑到的逻辑的任务&#xff0c;例如面对如下任务&#xff1a; 编写一个应用程序&#xff0c;接受地理信息、卫星图像和一些历史…...

ywtool dhcp命令

一.dhcp功能介绍 就是通过脚本实现dhcp地址池的增、删、改、查这几个功能日志文件路径: /var/log/ywtools/ywtool-dhcp.log/usr/local/ywtools/config/config.ini中account参数(ywtool dhcp这个命令用的,但是这个命令只能配置1个地址池,所以这里面的参数没什么意义) 二.配置…...

ChatGPT高效提问—基础知识(LM、PLM以及LLM)

ChatGPT高效提问—基础知识&#xff08;LM、PLM以及LLM&#xff09; ​ 了解语言模型&#xff08;language model, LM&#xff09;、预训练语言模型&#xff08;pre-trained language model, PLM&#xff09;和大型语言模型&#xff08;large language model, LLM&#xff09;…...

MongoDB复制集实战及原理分析

文章目录 MongoDB复制集复制集架构三节点复制集模式PSS模式&#xff08;官方推荐模式&#xff09;PSA模式 典型三节点复制集环境搭建复制集注意事项环境准备配置复制集复制集状态查询使用mtools创建复制集安全认证复制集连接方式 复制集成员角色属性一&#xff1a;Priority 0属…...

Java并发之synchronized详解

☆* o(≧▽≦)o *☆嗨~我是小奥&#x1f379; &#x1f4c4;&#x1f4c4;&#x1f4c4;个人博客&#xff1a;小奥的博客 &#x1f4c4;&#x1f4c4;&#x1f4c4;CSDN&#xff1a;个人CSDN &#x1f4d9;&#x1f4d9;&#x1f4d9;Github&#xff1a;传送门 &#x1f4c5;&a…...

Flask 项目自动生成 API 文档的高效实践

Flasgger&#xff0c;作为一款强大的 Flask 扩展&#xff0c;自动从 Flask 应用中提取并生成 OpenAPI 规范文档&#xff0c;配备 SwaggerUI&#xff0c;为开发者提供了一条快捷通道&#xff0c;让 API 的文档编制和交互式测试变得简单易行。Flasgger 的设计原则是简化开发流程&…...

WebChat——一个开源的聊天应用

Web Chat 是开源的聊天系统&#xff0c;支持一键免费部署私人Chat网页的应用程序。 开源地址&#xff1a;https://github.com/loks666/webchat 目录树 TOC &#x1f44b;&#x1f3fb; 开始使用 & 交流&#x1f6f3; 开箱即用 A 使用 Docker 部署B 使用 Docker-compose…...

【Linux系统 01】Vim工具

目录 一、Vim概述 1. 文件打开方式 2. 模式切换 二、命令模式 1. 移动与跳转 2. 复制与粘贴 3. 剪切与撤销 三、编辑模式 1. 插入 2. 替换 四、末行模式 1. 保存与退出 2. 查找与替换 3. 分屏显示 4. 命令执行 一、Vim概述 1. 文件打开方式 vim 文件路径&#…...

Oracle 面试题 | 09.精选Oracle高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…...

基于Springboot的校园失物招领网站(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的校园失物招领网站&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构…...

WPF布局面板

StackPanel StackPanel 是一种常用的布局控件,可以支持水平或垂直排列,但不会换行。当子元素添加到 StackPanel 中时,它们将按照添加的顺序依次排列。默认情况下,StackPanel 的排列方向是垂直的,即子元素将从上到下依次排列。可以使用 Orientation 属性更改排列方向。可以…...

灵活应对:策略模式在软件设计中的应用

策略模式是一种行为型设计模式&#xff0c;它允许定义一系列算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以互换使用。策略模式让算法的变化独立于使用算法的客户端&#xff0c;使得在不修改原有代码的情况下切换或扩展新的算法成为可能。 使用策略模式的场景包…...

eosio.token 智能合约介绍

一、目的 eosio.token系统合约定义了允许用户为基于EOSIO的区块链创建、发行和管理代币的结构和操作&#xff0c;它演示了一种实现允许创建和管理代币的智能合约的方法。本文详细介绍了eosio.token系统合约并在本地测试链上实际发行了代币进行演示&#xff0c;适用于EOS智能合…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题&#xff1a; 下面创建一个简单的Flask RESTful API示例。首先&#xff0c;我们需要创建环境&#xff0c;安装必要的依赖&#xff0c;然后…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

CMake控制VS2022项目文件分组

我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…...

BLEU评分:机器翻译质量评估的黄金标准

BLEU评分&#xff1a;机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域&#xff0c;衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标&#xff0c;自2002年由IBM的Kishore Papineni等人提出以来&#xff0c;…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...