当前位置: 首页 > news >正文

go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

(emit_logs.go源码)

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

(receive_logs.go源码)

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

相关文章:

go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 订阅模式&#xff0c;消息被路由投递给多个队列&#xff0c;一个消息被多个消费者获取。 1&#xff09; 可以有多个消费者 2&#xff09; 每个消费者有自己的queue&#xff08;队列&#xff09; 3&#xff09; 每个队列都要绑定到Exchange&#xff08;交换机&…...

科普类——5G远程实时操控技术在国内港口应用简介(十三)

在中国&#xff0c;5G远程实时操控技术已经在多个港口得到应用&#xff0c;并且应用规模不断扩大&#xff0c;展现出良好的发展前景。以下是一些具体的应用案例&#xff1a; 上港集团洋山港&#xff1a;上港集团与华为合作&#xff0c;在全球港口首次将F5G技术应用于超远程控制…...

整理:汉诺塔简析

大体上&#xff0c;要解决一个汉诺塔问题&#xff0c;就需要解决两个更简单的汉诺塔问题 以盘子数量 3 的汉诺塔问题为例 要将 3 个盘子从 A 移动到 C&#xff0c;就要&#xff1a; 将两个盘子从 A 移动到 B&#xff08;子问题 1&#xff09; 为了解决子问题 1&#xff0c;就…...

Android 8.1 设置--声音中增加通话音量

Android 8.1 设置--声音中增加通话音量 近来收到项目需求&#xff0c;想要在设置--声音中增加一项通话音量调节&#xff0c;具体修改参照如下&#xff1a; /vendor/mediatek/proprietary/packages/apps/MtkSettings/res/values/strings.xml <!-- Sound: Title for the opt…...

C#中使用OpenCvSharp4绘制直线、矩形、圆、文本

C#中使用OpenCvSharp4绘制直线、矩形、圆、文本 继之前的Python中使用Opencv-python库绘制直线、矩形、圆、文本和VC中使用OpenCV绘制直线、矩形、圆和文字&#xff0c;将之前的Python和C示例代码翻译成C#语言&#xff0c;很简单&#xff0c;还是借用OpenCvSharp4库中的Line、…...

Qt程序设计-自定义控件(提升法)

本文讲解如何使用提升法自定义控件,控件有界面。 实现自定义一个开关控件,控件上显示当前的状态。实现过程如下: 1、在创建的项目上,添加qt设计界面类。 2、选择widget 3、更改名称为MySwitchButton 4、添加两个图标资源文件,用来显示开关状态 5、添加两个label,一个…...

计算机网络基础知识笔记

1 HOST和PORT Claude-2-100k: 我来用简单的语言解释一下HOST和PORT这两个网络术语&#xff1a; HOST就是设备的IP地址,比如"127.0.0.1"就是我们本机的IP地址&#xff1b; PORT就是端口号,可以把它简单理解为设备上的门牌号。 举个类比,我们要给某个人发信件,需要知道…...

【iOS ARKit】2D肢体动作捕捉

人体肢体动作捕捉在动漫影视制作、游戏CG 动画、实时模型驱动中有着广泛的应用&#xff0c;利用 ARKit&#xff0c;无须额外的硬件设备即可实现 2D和3D人体一系列关节和骨骼的动态捕捉&#xff0c;由于移动AR 的便携性及低成本&#xff0c;必将促进相关产业的发展。 ARBody Tr…...

MAC word删除空白页

问题&#xff1a;MAC word删除空白页 解决&#xff1a; option删除键...

字面跳动前端面试题:React Hook为什么不能放在if/循环/嵌套函数里面?

答&#xff1a;首先&#xff0c;React Hooks 是为了简化组件逻辑和提高代码可读性而设计的。将 Hook 放在 if/循环/嵌套函数中会破坏它们的封装性和可预测性&#xff0c;使得代码更难维护和理解。同时&#xff0c;这样做也增加了代码的复杂度&#xff0c;可能会导致性能下降和潜…...

【SpringBoot】SpringBoot的web开发

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;SpringBoot ⛺️稳重求进&#xff0c;晒太阳 Wbe开发 使用Springboot 1&#xff09;、创建SpringBoot应用&#xff0c;选中我们需要的模块&#xff1b; 2&#xff09;、SpringBoot已经默…...

houdini 入门指南-参考自用,内有翻译错误

HOUDINI 18.5列1 GETTING STARTED 入门指南 What’s new in Houdini 18.5 胡迪尼18.5有什么新内容 New features and changes in Houdini 18.5.胡迪尼18.5的新功能和变化。Basics基础The basics of working with Houdini’s user interface.使用胡迪尼用户界面的基本知识。Shel…...

【笔记】SPN和PLMN 运营商网络名称显示

一、业务术语 缩写 全称 释义 CDNR Carrier Display Name Ressource 运营商显示名称资源 PLMN Public Land Mobile Network 公共陆地移动网络。 表示最终显示的网络运营商名字 SPN Service Provider Name SIM卡EF文件6F46。表示服务提供商名字,主要是SIM卡服务 OPL Operator …...

Selenium处理Alert弹窗

页面弹窗有 3 种类型&#xff1a; alert&#xff08;警告信息&#xff09; confirm&#xff08;确认信息&#xff09; prompt&#xff08;提示输入&#xff09; 对于页面出现的 alert 弹窗&#xff0c;Selenium 提供如下方法&#xff1a; 序号 方法/属性 描述 1 ac…...

FCIS 2023:洞悉网络安全新前沿,引领未来安全创新狂潮

在数字化浪潮席卷全球的今天&#xff0c;网络安全问题愈发凸显其重要性。 FCIS 2023网络安全创新大会作为业界瞩目的盛会&#xff0c;不仅汇聚了国际顶尖的网络安全专家&#xff0c;更展示了最前沿的安全技术与研究成果。那么&#xff0c;参与这场大会&#xff0c;我们究竟能学…...

4个最佳的免费全磁盘加密程序,总有一款适合你

全磁盘加密软件加密整个驱动器,而不仅仅是几个文件或文件夹。加密计算机的驱动器可以使你的私人数据免受窥探,即使你的计算机被盗。 你也不仅仅局限于一个硬盘驱动器。闪存驱动器和外部硬盘驱动器等外部设备也可以通过磁盘加密软件进行加密。 注意:Windows和macOS都集成了…...

SQL语句创建数据库

在SQL中&#xff0c;可以使用CREATE DATABASE语句来创建数据库。下面是一个示例&#xff1a; CREATE DATABASE database_name;其中&#xff0c;database_name是要创建的数据库的名称。你可以将其替换为你想要的数据库名称。 请注意&#xff0c;在不同的SQL数据库管理系统中&a…...

【lesson38】让minishell支持重定向

文章目录 minishell支持重定向minishell完整代码 minishell支持重定向 支持重定向的核心逻辑&#xff1a; 1.分析字符串是否含有重定向的符号&#xff0c;并且提取文件名。 #define INPUT_REDIR 0 //输入重定向 #define OUTPUT_REDIR 1 //输出重定向 #define APPEND_REDIR…...

【安装指南】maven下载、安装与配置详细教程

&#x1f33c;一、概述 maven功能与python的pip类似。 Apache Maven是一个用于软件项目管理和构建的强大工具。它是基于项目对象模型的&#xff0c;用于描述项目的构建配置和依赖关系。以下是一些关键的 Maven 特性和概念&#xff1a; POM&#xff08;Project Object Model&…...

matplotlib-中文乱码问题解决方案

前言 本文主要解决matplotlib在画图时&#xff0c;出现的中文乱码问题&#xff0c;具体问题示意如下&#xff1a; 下面将针对这个问题直接给出具体的解决步骤。 具体步骤 1、首先去网上下载并安装SimHei字体&#xff0c;其它字体也行&#xff0c;如下 并将它安装在此目录下…...

c/c++的opencv椒盐噪声

在 C/C 中实现椒盐噪声 椒盐噪声&#xff08;Salt-and-Pepper Noise&#xff09;&#xff0c;也称为脉冲噪声&#xff08;Impulse Noise&#xff09;&#xff0c;是数字图像中常见的一种噪声类型。它的特点是在图像中随机出现纯白色&#xff08;盐&#xff09;或纯黑色&#x…...

Redis7 新增数据结构深度解析:ListPack 的革新与优化

Redis 作为高性能的键值存储系统&#xff0c;其核心优势之一在于丰富的数据结构。随着版本迭代&#xff0c;Redis 不断优化现有结构并引入新特性。在 Redis 7.0 中&#xff0c;ListPack 作为新一代序列化格式正式登场&#xff0c;替代了传统的 ZipList&#xff08;压缩列表&…...

JavaScript性能优化实战的技术文-——仙盟创梦IDE

JavaScript性能优化实战技术文章大纲 性能优化的核心原则 减少代码执行时间降低内存消耗优化网络请求提升用户体验 代码层面的优化 减少全局变量使用&#xff0c;避免命名冲突和内存泄漏使用节流&#xff08;throttle&#xff09;和防抖&#xff08;debounce&#xff09;优…...

PH热榜 | 2025-05-29

1. Tapflow 2.0 标语&#xff1a;将你的文档转化为可销售的指导手册、操作手册和工作流程。 介绍&#xff1a;Tapflow 2.0将各类知识&#xff08;包括人工智能、设计、开发、营销等&#xff09;转化为有条理且可销售的产品。现在你可以导入文件&#xff0c;让人工智能快速为你…...

【Node.js】部署与运维

个人主页&#xff1a;Guiat 归属专栏&#xff1a;node.js 文章目录 1. Node.js 部署概述1.1 部署的核心要素1.2 Node.js 部署架构全景 2. 传统服务器部署2.1 Linux 服务器环境准备系统更新与基础软件安装创建应用用户 2.2 应用部署脚本2.3 环境变量管理2.4 Nginx 反向代理配置2…...

数据分析案例-基于红米和华为手机的用户评论分析

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…...

4.1.1 Spark SQL概述

Spark SQL是Apache Spark的一个模块&#xff0c;专门用于处理结构化数据。它引入了DataFrame这一编程抽象&#xff0c;DataFrame是带有Schema信息的分布式数据集合&#xff0c;类似于关系型数据库中的表。用户可以通过SQL、DataFrames API和Datasets API三种方式操作结构化数据…...

从“被动养老”到“主动健康管理”:平台如何重构代际关系?

在老龄化与数字化交织的背景下&#xff0c;代际关系的重构已成为破解养老难题的关键。 传统家庭养老模式中&#xff0c;代际互动多表现为单向的“赡养-被赡养”关系。 而智慧养老平台的介入&#xff0c;通过技术赋能、资源整合与情感连接&#xff0c;正在推动代际关系向“协作…...

WEB3—— 简易NFT铸造平台(ERC-721)-入门项目推荐

3. 简易NFT铸造平台&#xff08;ERC-721&#xff09; 目标&#xff1a;用户可以免费铸造一个 NFT&#xff0c;展示在前端页面。 内容&#xff1a; 编写 ERC-721 合约&#xff0c;每个地址可铸造一个 NFT。 提供 API&#xff1a; POST /mint&#xff1a;铸造 NFT&#xff08;调…...

CentOS 7.0重置root密码

文章目录 版本&#xff1a;CentOS 7.0内核版本&#xff1a;CentOS Linux, with Linux 3.10.0-123.el7.x86_64 服务器重启后&#xff0c;等待进入上述页面&#xff0c;按⬆⬇键&#xff0c;中断正常启动。在此页面按E&#xff0c;进入编辑模式 继续按⬇&#xff0c;找到linux16…...