go消息队列RabbitMQ - 订阅模式-direct
1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
2.绑定
绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:
err = ch.QueueBind(q.Name, // queue name"black", // routing key"logs", // exchangefalse,nil)
3.直连交换器
direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green。
在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为black或green的消息将转到Q2。所有其他消息将被丢弃。
4.多重绑定

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1和Q2。
5.发送日志
在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。
先创建一个direct交换器:
err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
指定routing key发送一条消息:
body := bodyFrom(os.Args)
err = ch.Publish("logs_direct", // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),
})
为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。
6.订阅
为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取
q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name, // queue names, // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}
7.完整示例

emit_log_direct.go脚本的代码:
package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct", // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}
receive_logs_direct.go的代码:
package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name, // queue names, // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:
go run receive_logs_direct.go warning error > logs_from_rabbit.log
如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:
go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C
例如,要发出error日志消息,只需输入:
go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)
参考文章:
go rabbitmq Routing模式 - 范斯猫 (fansimao.com)
RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)
相关文章:
go消息队列RabbitMQ - 订阅模式-direct
1.发布订阅 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下: 队列与交换机的绑定,不能…...
PyTorch 2.2 中文官方教程(十八)
开始使用完全分片数据并行(FSDP) 原文:pytorch.org/tutorials/intermediate/FSDP_tutorial.html 译者:飞龙 协议:CC BY-NC-SA 4.0 作者:Hamid Shojanazeri,Yanli Zhao,Shen Li 注意…...
jenkins部署vue项目
首次加载比较慢、需要等待很长时间 到这个页面算是初始化完成了 输入密码路径为 之前设置的路径 可以在文件中找或者 docker logs jenkins 直接安装推荐插件 正在安装中!! 安装成功后创建管理员账号(一定要记住这个也是登录账号密码) 这里实例配置直接…...
十一、C++核心编程(2)引用
一、引用的基本使用 作用: 给变量起别名语法: 数据类型 &别名 原名 #include<iostream> #include<string.h> using namespace std;int main() {//引用基本语法//数据类型 &别名 原名int a 10;//创建引用int &b a;cout << "a "…...
numpy学习总结二
单词发音: squeeze 发音:死贵子 concatenation [kɒnˌktəˈneɪʃən] 拼接;串联 threshold [θreʃhəʊld] 死re后的 quantile 拷n太哦 分位数 因果不能改 智慧不能赐 正法不可说 无缘不能度 天雨虽宽不润无根之草;佛法虽广不度无缘之人 …...
3 编辑器(Vim)
1.完成 vimtutor。备注:它在一个 80x24(80 列,24 行) 终端窗口看起来效果最好。 2.下载我们提供的 vimrc,然后把它保存到 ~/.vimrc。 通读这个注释详细的文件 (用 Vim!), 然后观察 …...
C/C++ (stdio.h)标准库详解
cstdio,在C语言中称为stdio.h。该库使用所谓的流与物理设备(如键盘、打印机、终端)或系统支持的任何其他类型的文件一起操作。 在本文将会通过介绍函数参数,举出实际的简单例子来帮助大家快速上手使用函数。 目录 一、流 二、库函数 1、F…...
深度学习介绍
对于具备完善业务逻辑的任务,大多数情况下,正常的人都可以给出一个符合业务逻辑的应用程序。但是对于一些包含超过人类所能考虑到的逻辑的任务,例如面对如下任务: 编写一个应用程序,接受地理信息、卫星图像和一些历史…...
ywtool dhcp命令
一.dhcp功能介绍 就是通过脚本实现dhcp地址池的增、删、改、查这几个功能日志文件路径: /var/log/ywtools/ywtool-dhcp.log/usr/local/ywtools/config/config.ini中account参数(ywtool dhcp这个命令用的,但是这个命令只能配置1个地址池,所以这里面的参数没什么意义) 二.配置…...
ChatGPT高效提问—基础知识(LM、PLM以及LLM)
ChatGPT高效提问—基础知识(LM、PLM以及LLM) 了解语言模型(language model, LM)、预训练语言模型(pre-trained language model, PLM)和大型语言模型(large language model, LLM)…...
MongoDB复制集实战及原理分析
文章目录 MongoDB复制集复制集架构三节点复制集模式PSS模式(官方推荐模式)PSA模式 典型三节点复制集环境搭建复制集注意事项环境准备配置复制集复制集状态查询使用mtools创建复制集安全认证复制集连接方式 复制集成员角色属性一:Priority 0属…...
Java并发之synchronized详解
☆* o(≧▽≦)o *☆嗨~我是小奥🍹 📄📄📄个人博客:小奥的博客 📄📄📄CSDN:个人CSDN 📙📙📙Github:传送门 📅&a…...
Flask 项目自动生成 API 文档的高效实践
Flasgger,作为一款强大的 Flask 扩展,自动从 Flask 应用中提取并生成 OpenAPI 规范文档,配备 SwaggerUI,为开发者提供了一条快捷通道,让 API 的文档编制和交互式测试变得简单易行。Flasgger 的设计原则是简化开发流程&…...
WebChat——一个开源的聊天应用
Web Chat 是开源的聊天系统,支持一键免费部署私人Chat网页的应用程序。 开源地址:https://github.com/loks666/webchat 目录树 TOC 👋🏻 开始使用 & 交流🛳 开箱即用 A 使用 Docker 部署B 使用 Docker-compose…...
【Linux系统 01】Vim工具
目录 一、Vim概述 1. 文件打开方式 2. 模式切换 二、命令模式 1. 移动与跳转 2. 复制与粘贴 3. 剪切与撤销 三、编辑模式 1. 插入 2. 替换 四、末行模式 1. 保存与退出 2. 查找与替换 3. 分屏显示 4. 命令执行 一、Vim概述 1. 文件打开方式 vim 文件路径&#…...
Oracle 面试题 | 09.精选Oracle高频面试题
🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 🍚 蓝桥云课签约作者、上架课程《Vue.js 和 E…...
基于Springboot的校园失物招领网站(有报告)。Javaee项目,springboot项目。
演示视频: 基于Springboot的校园失物招领网站(有报告)。Javaee项目,springboot项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构…...
WPF布局面板
StackPanel StackPanel 是一种常用的布局控件,可以支持水平或垂直排列,但不会换行。当子元素添加到 StackPanel 中时,它们将按照添加的顺序依次排列。默认情况下,StackPanel 的排列方向是垂直的,即子元素将从上到下依次排列。可以使用 Orientation 属性更改排列方向。可以…...
灵活应对:策略模式在软件设计中的应用
策略模式是一种行为型设计模式,它允许定义一系列算法,并将每个算法封装起来,使它们可以互换使用。策略模式让算法的变化独立于使用算法的客户端,使得在不修改原有代码的情况下切换或扩展新的算法成为可能。 使用策略模式的场景包…...
eosio.token 智能合约介绍
一、目的 eosio.token系统合约定义了允许用户为基于EOSIO的区块链创建、发行和管理代币的结构和操作,它演示了一种实现允许创建和管理代币的智能合约的方法。本文详细介绍了eosio.token系统合约并在本地测试链上实际发行了代币进行演示,适用于EOS智能合…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
C++ 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
JS设计模式(4):观察者模式
JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中࿰…...
MyBatis中关于缓存的理解
MyBatis缓存 MyBatis系统当中默认定义两级缓存:一级缓存、二级缓存 默认情况下,只有一级缓存开启(sqlSession级别的缓存)二级缓存需要手动开启配置,需要局域namespace级别的缓存 一级缓存(本地缓存&#…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
