go消息队列RabbitMQ - 订阅模式-fanout
1、发布订阅
订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
- 相关场景:邮件群发,群聊天,广播(广告)
2、Exchanges(交换器)
消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
有几种交换器类型可用:direct, topic, headers 和 fanout。我们将集中讨论最后一个——fanout。
2.1 创建交换器
创建一个这种类型的交换器,并给它起个名字叫logs:
err = ch.ExchangeDeclare("logs", // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。
2.2 临时队列
也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。
自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。
q, err := ch.QueueDeclare("", // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue, // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil, // arguments
)
上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2.3 交换器与队列绑定

交换器和队列之间的关系称为绑定。
err = ch.QueueBind(q.Name, // queue name"", // routing key"logs", // exchangefalse,nil,
)
从现在开始,logs交换器将会把消息添加到我们的队列中。
2.4 发布消息到交换机
例如,发布到fanout交换器:
body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})
3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:
package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs", // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}
(emit_logs.go源码)
如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。
如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。
receive_logs.go的代码:
package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs", // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"", // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
(receive_logs.go源码)
如果要将日志保存到文件,只需打开控制台并输入:
go run receive_logs.go > logs_from_rabbit.log
如果希望在屏幕上查看日志,请切换到一个新的终端并运行:
go run receive_logs.go
当然,要发出日志,请输入:
go run emit_log.go
使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。
相关文章:
go消息队列RabbitMQ - 订阅模式-fanout
1、发布订阅 订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。 1) 可以有多个消费者 2) 每个消费者有自己的queue(队列) 3) 每个队列都要绑定到Exchange(交换机&…...
科普类——5G远程实时操控技术在国内港口应用简介(十三)
在中国,5G远程实时操控技术已经在多个港口得到应用,并且应用规模不断扩大,展现出良好的发展前景。以下是一些具体的应用案例: 上港集团洋山港:上港集团与华为合作,在全球港口首次将F5G技术应用于超远程控制…...
整理:汉诺塔简析
大体上,要解决一个汉诺塔问题,就需要解决两个更简单的汉诺塔问题 以盘子数量 3 的汉诺塔问题为例 要将 3 个盘子从 A 移动到 C,就要: 将两个盘子从 A 移动到 B(子问题 1) 为了解决子问题 1,就…...
Android 8.1 设置--声音中增加通话音量
Android 8.1 设置--声音中增加通话音量 近来收到项目需求,想要在设置--声音中增加一项通话音量调节,具体修改参照如下: /vendor/mediatek/proprietary/packages/apps/MtkSettings/res/values/strings.xml <!-- Sound: Title for the opt…...
C#中使用OpenCvSharp4绘制直线、矩形、圆、文本
C#中使用OpenCvSharp4绘制直线、矩形、圆、文本 继之前的Python中使用Opencv-python库绘制直线、矩形、圆、文本和VC中使用OpenCV绘制直线、矩形、圆和文字,将之前的Python和C示例代码翻译成C#语言,很简单,还是借用OpenCvSharp4库中的Line、…...
Qt程序设计-自定义控件(提升法)
本文讲解如何使用提升法自定义控件,控件有界面。 实现自定义一个开关控件,控件上显示当前的状态。实现过程如下: 1、在创建的项目上,添加qt设计界面类。 2、选择widget 3、更改名称为MySwitchButton 4、添加两个图标资源文件,用来显示开关状态 5、添加两个label,一个…...
计算机网络基础知识笔记
1 HOST和PORT Claude-2-100k: 我来用简单的语言解释一下HOST和PORT这两个网络术语: HOST就是设备的IP地址,比如"127.0.0.1"就是我们本机的IP地址; PORT就是端口号,可以把它简单理解为设备上的门牌号。 举个类比,我们要给某个人发信件,需要知道…...
【iOS ARKit】2D肢体动作捕捉
人体肢体动作捕捉在动漫影视制作、游戏CG 动画、实时模型驱动中有着广泛的应用,利用 ARKit,无须额外的硬件设备即可实现 2D和3D人体一系列关节和骨骼的动态捕捉,由于移动AR 的便携性及低成本,必将促进相关产业的发展。 ARBody Tr…...
MAC word删除空白页
问题:MAC word删除空白页 解决: option删除键...
字面跳动前端面试题:React Hook为什么不能放在if/循环/嵌套函数里面?
答:首先,React Hooks 是为了简化组件逻辑和提高代码可读性而设计的。将 Hook 放在 if/循环/嵌套函数中会破坏它们的封装性和可预测性,使得代码更难维护和理解。同时,这样做也增加了代码的复杂度,可能会导致性能下降和潜…...
【SpringBoot】SpringBoot的web开发
📝个人主页:五敷有你 🔥系列专栏:SpringBoot ⛺️稳重求进,晒太阳 Wbe开发 使用Springboot 1)、创建SpringBoot应用,选中我们需要的模块; 2)、SpringBoot已经默…...
houdini 入门指南-参考自用,内有翻译错误
HOUDINI 18.5列1 GETTING STARTED 入门指南 What’s new in Houdini 18.5 胡迪尼18.5有什么新内容 New features and changes in Houdini 18.5.胡迪尼18.5的新功能和变化。Basics基础The basics of working with Houdini’s user interface.使用胡迪尼用户界面的基本知识。Shel…...
【笔记】SPN和PLMN 运营商网络名称显示
一、业务术语 缩写 全称 释义 CDNR Carrier Display Name Ressource 运营商显示名称资源 PLMN Public Land Mobile Network 公共陆地移动网络。 表示最终显示的网络运营商名字 SPN Service Provider Name SIM卡EF文件6F46。表示服务提供商名字,主要是SIM卡服务 OPL Operator …...
Selenium处理Alert弹窗
页面弹窗有 3 种类型: alert(警告信息) confirm(确认信息) prompt(提示输入) 对于页面出现的 alert 弹窗,Selenium 提供如下方法: 序号 方法/属性 描述 1 ac…...
FCIS 2023:洞悉网络安全新前沿,引领未来安全创新狂潮
在数字化浪潮席卷全球的今天,网络安全问题愈发凸显其重要性。 FCIS 2023网络安全创新大会作为业界瞩目的盛会,不仅汇聚了国际顶尖的网络安全专家,更展示了最前沿的安全技术与研究成果。那么,参与这场大会,我们究竟能学…...
4个最佳的免费全磁盘加密程序,总有一款适合你
全磁盘加密软件加密整个驱动器,而不仅仅是几个文件或文件夹。加密计算机的驱动器可以使你的私人数据免受窥探,即使你的计算机被盗。 你也不仅仅局限于一个硬盘驱动器。闪存驱动器和外部硬盘驱动器等外部设备也可以通过磁盘加密软件进行加密。 注意:Windows和macOS都集成了…...
SQL语句创建数据库
在SQL中,可以使用CREATE DATABASE语句来创建数据库。下面是一个示例: CREATE DATABASE database_name;其中,database_name是要创建的数据库的名称。你可以将其替换为你想要的数据库名称。 请注意,在不同的SQL数据库管理系统中&a…...
【lesson38】让minishell支持重定向
文章目录 minishell支持重定向minishell完整代码 minishell支持重定向 支持重定向的核心逻辑: 1.分析字符串是否含有重定向的符号,并且提取文件名。 #define INPUT_REDIR 0 //输入重定向 #define OUTPUT_REDIR 1 //输出重定向 #define APPEND_REDIR…...
【安装指南】maven下载、安装与配置详细教程
🌼一、概述 maven功能与python的pip类似。 Apache Maven是一个用于软件项目管理和构建的强大工具。它是基于项目对象模型的,用于描述项目的构建配置和依赖关系。以下是一些关键的 Maven 特性和概念: POM(Project Object Model&…...
matplotlib-中文乱码问题解决方案
前言 本文主要解决matplotlib在画图时,出现的中文乱码问题,具体问题示意如下: 下面将针对这个问题直接给出具体的解决步骤。 具体步骤 1、首先去网上下载并安装SimHei字体,其它字体也行,如下 并将它安装在此目录下…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
java 实现excel文件转pdf | 无水印 | 无限制
文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
