在Go中迅速使用RabbitMQ
文章目录
- 1 认识
- 1.1 MQ分类
- 1.2 安装
- 1.3 基本流程
- 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)
- 3 交换机
- 3.1 fanout
- 3.2 direct
- 3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go)
- 4 Golang创建交换机/队列/Publish/Consume/Bind
- 5 可靠性
- 5.1 生产者可靠性
- 5.2 MQ可靠性
- 5.2.1 Lazy Queue
- 5.3 消费者可靠性
- 5.4 业务幂等性
- 5.4 Golang实现可靠性
- 1. 确保消息生产者的可靠性
- 2. 确保消息队列的可靠性
- 3. 确保消息消费者的可靠性
- 4. 容错处理
- 6 延迟消息
- 6.1 死信交换机
- 6.2 延迟消息插件
- 6.2.1 安装
- 6.2.2 使用
- 6.2.3 应用场景
- 为什么要使用消息队列

1 认识
1.1 MQ分类
-
有Broker
- 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka
- 轻Topic —— topic只是一种中转模式 —— rabbitMQ
-
无Broker
1.2 安装
# latest RabbitMQ 3.13
docker run \-e RABBITMQ_DEFAULT_USER=dusong \ #默认账号和密码均为:guest-e RABBITMQ_DEFAULT_PASS=123123 \-d \ #detached mode-v mq-plugins:/plugins \ #插件挂载--rm \--name rabbitmq \-p 5672:5672 \ #消息通信端口-p 15672:15672 \ #管理界面端口rabbitmq:3.13-management
1.3 基本流程

- exchange只能转发消息,不能存储消息
- 通过bind将queue绑定到exchange
2 Work模型
-
多个消费者绑定到一个队列
-
同一个消息只会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)

err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global )
3 交换机
3.1 fanout
fanout类型的交换机会将消息转发给所有绑定到改交换机的队列
3.2 direct

err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare an exchange")ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,"logs_direct", // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),
})
3.3 topic

4 Golang创建交换机/队列/Publish/Consume/Bind
-
创建交换机
err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments ) -
创建队列
q, err := ch.QueueDeclare("hello", // namefalse, // durable(是否持久化)false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments ) -
绑定
err = ch.QueueBind(q.Name, // queue name"log", // routing key"logs_direct", // exchangefalse,nil ) -
发送
body := "this is log" err = ch.PublishWithContext(ctx,"logs_direct", // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),}) -
接收
msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args )
5 可靠性
5.1 生产者可靠性
-
生产者重连
-
生产者确认(ack)
5.2 MQ可靠性
- 交换机/队列持久化
- 消息持久化
5.2.1 Lazy Queue


5.3 消费者可靠性
-
消费者确认机制

5.4 业务幂等性
- 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
- 给消息加上uuid
- 在业务逻辑上做修改
5.4 Golang实现可靠性
在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:
1. 确保消息生产者的可靠性
消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用
Channel.Confirm()方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。ch.Confirm(false) // 启用发布确认模式 confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))// 发布消息 err = ch.Publish(exchange, routingKey, mandatory, immediate, msg) if err != nil {// 处理发布失败的情况 }select { case confirmed := <-confirm:if confirmed.Ack {fmt.Println("消息已确认")} else {fmt.Println("消息未确认")} case <-time.After(time.Second * 5):fmt.Println("消息确认超时") }消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置
DeliveryMode为amqp.Persistent来实现:msg := amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte("Hello, RabbitMQ!"), }2. 确保消息队列的可靠性
队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。
_, err = ch.QueueDeclare("my_queue", // 队列名true, // 是否持久化false, // 是否自动删除false, // 是否排他false, // 是否阻塞nil, // 其他参数 ) if err != nil {log.Fatalf("Failed to declare a queue: %s", err) }3. 确保消息消费者的可靠性
手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。
msgs, err := ch.Consume("my_queue", // 队列名"", // 消费者标识false, // 自动确认false, // 是否排他false, // 是否阻塞false, // 是否在同一个连接上消费nil, // 其他参数 ) if err != nil {log.Fatalf("Failed to register a consumer: %s", err) }for d := range msgs {// 处理消息fmt.Printf("Received a message: %s", d.Body)// 手动确认d.Ack(false) }QoS(Quality of Service): 设置消费者的 QoS 参数,例如
prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。err = ch.Qos(1, // 每次处理一条消息0, // 消息大小限制(不限制)false, // 是否应用于整个通道 ) if err != nil {log.Fatalf("Failed to set QoS: %s", err) }4. 容错处理
重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。
死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。
通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。
6 延迟消息
6.1 死信交换机

6.2 延迟消息插件
6.2.1 安装
-
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
-
将插件放在该目录

-
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange
6.2.2 使用
// 3. 声明延迟交换机err = ch.ExchangeDeclare("delay_exchange", // 交换机名称"x-delayed-message", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待amqp.Table{"x-delayed-type": "direct"}, // 交换机类型的设置)failOnError(err, "Failed to declare an exchange")// 4. 发送消息body := "Hello World with delay"err = ch.Publish("delay_exchange", // 交换机名称"routing_key", // 路由键false, // 是否强制发送false, // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body: []byte(body),Headers: amqp.Table{"x-delay": int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)},})
6.2.3 应用场景
- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})
### 6.2.3 应用场景- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景[外链图片转存中...(img-eA0QMPnx-1725527666228)]
相关文章:
在Go中迅速使用RabbitMQ
文章目录 1 认识1.1 MQ分类1.2 安装1.3 基本流程 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)3 交换机3.1 fanout3.2 direct3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go) 4 Golang创建交换机/队列/Publish/Consume/B…...
Windows JDK安装详细教程
一、关于JDK 1.1 简介 Java是一种广泛使用的计算机编程语言,拥有跨平台、面向对象、泛型编程的特性,广泛应用于企业级Web应用开发和移动应用开发。 JDK(Java Development Kit)是用于开发 Java 应用程序的工具包。它由以下几个主要…...
Ribbon负载均衡底层原理
springcloude服务实例与服务实例之间发送请求,首先根据服务名注册到nacos,然后发送请求,nacos可以根据服务名找到对应的服务实例。 SpringCloudRibbon的底层采用了一个拦截器,拦截了openfeign发出的请求,对地址做了修…...
【C语言可变参数函数的使用与原理分析】
文章目录 1 前言2 实例2.1实例程序2.2程序执行结果2.3 程序分析 3 补充4 总结 1 前言 在编程过程中,有时会遇到需要定义参数数量不固定的函数的情况。 C语言提供了一种灵活的解决方案:变参函数。这种函数能够根据实际调用时的需求,接受任意…...
【笔记】Java EE应用开发环境配置(JDK+Maven+Tomcat+MySQL+IDEA)
一、安装JDK17 1.下载JDK17 https://download.oracle.com/java/17/archive/jdk-17.0.7_windows-x64_bin.zip 2.配置环境变量 下载后,解压到本地(目录中最好不要有中文或特殊字符) 打开【控制面板】-【系统和安全】-【系统】-【高级系统…...
一文讲懂扩散模型
一文讲懂扩散模型 扩散模型(Diffusion Models, DM)是近年来在计算机视觉、自然语言处理等领域取得显著进展的一种生成模型。其思想根源可以追溯到非平衡热力学,通过模拟数据的扩散和去噪过程来生成新的样本。以下将详细阐述扩散模型的基本原理…...
学习笔记八:基于Jenkins+k8s+Git+DockerHub等技术链构建企业级DevOps容器云平台
基于Jenkinsk8sGitDockerHub等技术链构建企业级DevOps容器云平台 测试jenkins的CI/CD在Jenkins中安装kubernetes插件安装blueocean插件配置jenkins连接到我们存在的k8s集群配置pod-template添加自己的dockerhub凭据测试通过Jenkins部署应用发布到k8s开发环境、测试环境、生产环…...
科研绘图系列:R语言柱状图分布(histogram plot)
文章目录 介绍加载R包读取数据画图介绍 柱状图(Bar Chart)是一种常用的数据可视化图表,用于展示和比较不同类别或组的数据。它通过在二维平面上绘制一系列垂直或水平的柱子来表示数据的大小,每个柱子的长度或高度代表一个数据点的数值。柱状图非常适合于展示分类数据的分布…...
vue3+ts封装类似于微信消息的组件
组件代码如下: <template><div:class"[voice-message, { sent: isSent, received: !isSent }]":style"{ backgroundColor: backgroundColor }"click"togglePlayback"><!-- isSent为false在左侧,为true在右…...
ES6 reduce方法详解:示例、应用场景与实用技巧
在JavaScript中,reduce 方法是一个非常强大的数组方法,它允许你将数组中的元素归并(reduce)为单个值。reduce 方法执行一个由你提供的reducer函数(归并函数),将其结果汇总为单一的返回值。 一.…...
java后端保存的本地图片通过ip+端口直接访问
直接上代码吧 package com.ydx.emms.datapro.controller;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.…...
2024 年高教社杯全国大学生数学建模竞赛B题4小问解题思路(第二版)
原文链接:https://www.cnblogs.com/qimoxuan/articles/18399415 问题 1:抽样检测方案设计 详细解题思路: 确定抽样检测目标:企业需要确定一个可接受的次品率上限(标称值),以及在该次品率下&am…...
docker-nginx数据卷挂载
一、案例1-利用Nginx容器部署静态资源 1.1、需求: 创建Nginx容器, 修改nginx容器内的html目录下的index.html文件,查看变化将静态资源部署到nginx的html目录 1.2、修改html目录下的index.html文件,查看变化 因为docker运用得最小化系统环境,解决办法就…...
项目实战 ---- 商用落地视频搜索系统(8)---优化(2)---查询逻辑层优化
目录 背景 技术衡量与方案 一种可实现方案 可实现方案及设计描述 可能存在的问题 一种创新实现方案 方案的改良设计 策略公式 优化的实现 完整代码 代码解释 异常场景的考量 处理方式 运行注意事项 运行结果 结果优化对比与解释 背景 在项目实战 ---- 商用落地…...
山东大学机试试题合集
🍰🍰🍰高分篇已经涵盖了绝大多数的机试考点,由于临近预推免,各校的机试蜂拥而至,我们接下来先更一些各高校机试题合集,算是对前边学习成果的深入学习,也是对我们代码能力的锻炼。加油…...
餐厅食品留样管理系统小程序的设计
管理员账户功能包括:系统首页,个人中心,窗口负责人管理,窗口员工管理,冰柜管理,排班信息管理,留样食品管理,教育宣传管理,系统管理 微信端账号功能包括:系统…...
亚马逊运营:如何提高亚马逊销量和运营效率?
不少亚马逊卖家们为了扩大业务规模和提高销量,会创建多个卖家账户来同时运营多个亚马逊店铺。问题是,这种多店铺运营模式并非没有风险——亚马逊运营的一个重要方面就是账户的健康管理。一旦某个账户出现问题,亚马逊的算法就可能会启动关联检…...
设计模式背后的设计原则和思想
设计模式背后的设计原则和思想是一套指导我们如何设计高质量软件系统的准则和方法论。这些原则和思想不仅有助于提升软件的可维护性、可扩展性和可复用性,还能帮助开发团队更好地应对复杂多变的需求。以下是一些核心的设计原则和思想: 1. 设计原则 设计…...
项目总体框架
一.后端(包装servlet) 使用BaseServlet进行请求的初步处理(利用继承进行执行这个) 在BaseServlet中 处理请求的类型找到对象的方法,并使用注解找到参数名,执行参数自动注入。 package com.csdn.controlle…...
k8s Prometheus
一、部署 Prometheus kubectl create ns kube-ops# 创建 prometheus-cm.yaml apiVersion: v1 kind: ConfigMap metadata:name: prometheus-confignamespace: kube-ops data:prometheus.yml: |global:scrape_interval: 15s # 表示 prometheus 抓取指标数据的频率,默…...
IndexTTS-2-LLM免费体验:基于大语言模型的新一代TTS服务
IndexTTS-2-LLM免费体验:基于大语言模型的新一代TTS服务 1. 引言:语音合成技术的革新 语音合成技术正在经历一场由大语言模型驱动的革命。传统的文本转语音(TTS)系统虽然能够将文字转化为语音,但在自然度和情感表达上始终存在局限。IndexTT…...
GNOME-BOXES虚拟机快速上手:从安装到共享文件全攻略
1. GNOME-BOXES初体验:为什么选择它? 第一次接触GNOME-BOXES是在我需要临时运行一个Windows应用的时候。作为一个长期使用Linux的用户,我一直在寻找一个既轻量又简单的虚拟机方案。试过VirtualBox,也用过VMware,但要么…...
【算法三十八】200. 岛屿数量
200. 岛屿数量 DFS: class Solution {public int numIslands(char[][] grid) {int ans 0;for(int i 0;i<grid.length;i){for(int j 0;j<grid[0].length;j){if(grid[i][j]1){dfs(grid,i,j);ans;}}}return ans;}private void dfs(char[][] grid,int i,int …...
3步快速完成NCM文件转换:免费音频解密工具终极指南
3步快速完成NCM文件转换:免费音频解密工具终极指南 【免费下载链接】NCMconverter NCMconverter将ncm文件转换为mp3或者flac文件 项目地址: https://gitcode.com/gh_mirrors/nc/NCMconverter 你是否遇到过下载的音乐只能在特定平台播放的困扰?NCM…...
Phi-4-reasoning-vision-15B在研发协作中的应用:代码IDE截图理解与问题定位
Phi-4-reasoning-vision-15B在研发协作中的应用:代码IDE截图理解与问题定位 1. 引言:研发协作中的视觉理解需求 在软件开发团队中,工程师们每天都要处理大量代码截图和IDE界面。当遇到问题时,最常见的做法是把报错截图或代码片段…...
小白也能玩转AI配音!Fish Speech 1.5一键部署实战指南
小白也能玩转AI配音!Fish Speech 1.5一键部署实战指南 想让你的文字变成专业级语音吗?Fish Speech 1.5作为一款强大的AI语音合成工具,支持12种语言和声音克隆功能,现在通过CSDN星图镜像,只需简单几步就能快速体验。本…...
Claude Code 权限 / 安全审查调用流程图
Claude Code 权限 / 安全审查调用流程图 这份文档的目标不是“介绍功能”,而是帮助你 复刻 Claude Code 的权限判定链路 到你们自己的手机 Agent 里。 重点回答 4 个问题: 一个 action 从模型产生到真正执行,中间经过了哪些关卡? 哪些地方是 确定性规则,哪些地方会 请求…...
JS中彻底删除JSON对象组成的数组中的元素
在 JS 中,对于某个由 JSON 对象组成的数组,例如:var test [{ "a": "1", "b": "2" }, { "a": "3", "b": "4" }, { "a": "5", "b…...
【JEECG Boot】 JEECG Boot——Online表单 系统性知识体系全解
文章目录JEECG Boot——Online表单一、核心基础认知1.1 官方定义与核心定位1.2 核心价值与解决的痛点1.3 与代码生成器的核心区别1.4 技术栈与运行环境依赖1.5 适用场景与能力边界二、核心架构与底层驱动原理2.1 整体四层架构体系2.2 元数据驱动的核心原理2.3 核心元数据模型与…...
STM32与Linux的无缝协作:通过USB CDC/VCP实现高效数据交互
在现代嵌入式机器人系统中,常见的架构是“双核协同”:一个高性能 Linux 主板(如运行 OpenWrt 的 MT7628 )负责网络、音视频和高级应用;一个实时性更强的 MCU(如 STM32F4/F7)负责电机控制、传感器…...
