在Go中迅速使用RabbitMQ
文章目录
- 1 认识
- 1.1 MQ分类
- 1.2 安装
- 1.3 基本流程
- 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)
- 3 交换机
- 3.1 fanout
- 3.2 direct
- 3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go)
- 4 Golang创建交换机/队列/Publish/Consume/Bind
- 5 可靠性
- 5.1 生产者可靠性
- 5.2 MQ可靠性
- 5.2.1 Lazy Queue
- 5.3 消费者可靠性
- 5.4 业务幂等性
- 5.4 Golang实现可靠性
- 1. 确保消息生产者的可靠性
- 2. 确保消息队列的可靠性
- 3. 确保消息消费者的可靠性
- 4. 容错处理
- 6 延迟消息
- 6.1 死信交换机
- 6.2 延迟消息插件
- 6.2.1 安装
- 6.2.2 使用
- 6.2.3 应用场景
- 为什么要使用消息队列

1 认识
1.1 MQ分类
-
有Broker
- 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka
- 轻Topic —— topic只是一种中转模式 —— rabbitMQ
-
无Broker
1.2 安装
# latest RabbitMQ 3.13
docker run \-e RABBITMQ_DEFAULT_USER=dusong \ #默认账号和密码均为:guest-e RABBITMQ_DEFAULT_PASS=123123 \-d \ #detached mode-v mq-plugins:/plugins \ #插件挂载--rm \--name rabbitmq \-p 5672:5672 \ #消息通信端口-p 15672:15672 \ #管理界面端口rabbitmq:3.13-management
1.3 基本流程

- exchange只能转发消息,不能存储消息
- 通过bind将queue绑定到exchange
2 Work模型
-
多个消费者绑定到一个队列
-
同一个消息只会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)

err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global )
3 交换机
3.1 fanout
fanout类型的交换机会将消息转发给所有绑定到改交换机的队列
3.2 direct

err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare an exchange")ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,"logs_direct", // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),
})
3.3 topic

4 Golang创建交换机/队列/Publish/Consume/Bind
-
创建交换机
err = ch.ExchangeDeclare("logs_direct", // name"direct", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments ) -
创建队列
q, err := ch.QueueDeclare("hello", // namefalse, // durable(是否持久化)false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments ) -
绑定
err = ch.QueueBind(q.Name, // queue name"log", // routing key"logs_direct", // exchangefalse,nil ) -
发送
body := "this is log" err = ch.PublishWithContext(ctx,"logs_direct", // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),}) -
接收
msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto ackfalse, // exclusivefalse, // no localfalse, // no waitnil, // args )
5 可靠性
5.1 生产者可靠性
-
生产者重连
-
生产者确认(ack)
5.2 MQ可靠性
- 交换机/队列持久化
- 消息持久化
5.2.1 Lazy Queue


5.3 消费者可靠性
-
消费者确认机制

5.4 业务幂等性
- 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
- 给消息加上uuid
- 在业务逻辑上做修改
5.4 Golang实现可靠性
在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:
1. 确保消息生产者的可靠性
消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用
Channel.Confirm()方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。ch.Confirm(false) // 启用发布确认模式 confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))// 发布消息 err = ch.Publish(exchange, routingKey, mandatory, immediate, msg) if err != nil {// 处理发布失败的情况 }select { case confirmed := <-confirm:if confirmed.Ack {fmt.Println("消息已确认")} else {fmt.Println("消息未确认")} case <-time.After(time.Second * 5):fmt.Println("消息确认超时") }消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置
DeliveryMode为amqp.Persistent来实现:msg := amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte("Hello, RabbitMQ!"), }2. 确保消息队列的可靠性
队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。
_, err = ch.QueueDeclare("my_queue", // 队列名true, // 是否持久化false, // 是否自动删除false, // 是否排他false, // 是否阻塞nil, // 其他参数 ) if err != nil {log.Fatalf("Failed to declare a queue: %s", err) }3. 确保消息消费者的可靠性
手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。
msgs, err := ch.Consume("my_queue", // 队列名"", // 消费者标识false, // 自动确认false, // 是否排他false, // 是否阻塞false, // 是否在同一个连接上消费nil, // 其他参数 ) if err != nil {log.Fatalf("Failed to register a consumer: %s", err) }for d := range msgs {// 处理消息fmt.Printf("Received a message: %s", d.Body)// 手动确认d.Ack(false) }QoS(Quality of Service): 设置消费者的 QoS 参数,例如
prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。err = ch.Qos(1, // 每次处理一条消息0, // 消息大小限制(不限制)false, // 是否应用于整个通道 ) if err != nil {log.Fatalf("Failed to set QoS: %s", err) }4. 容错处理
重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。
死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。
通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。
6 延迟消息
6.1 死信交换机

6.2 延迟消息插件
6.2.1 安装
-
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
-
将插件放在该目录

-
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange
6.2.2 使用
// 3. 声明延迟交换机err = ch.ExchangeDeclare("delay_exchange", // 交换机名称"x-delayed-message", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待amqp.Table{"x-delayed-type": "direct"}, // 交换机类型的设置)failOnError(err, "Failed to declare an exchange")// 4. 发送消息body := "Hello World with delay"err = ch.Publish("delay_exchange", // 交换机名称"routing_key", // 路由键false, // 是否强制发送false, // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body: []byte(body),Headers: amqp.Table{"x-delay": int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)},})
6.2.3 应用场景
- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})
### 6.2.3 应用场景- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景[外链图片转存中...(img-eA0QMPnx-1725527666228)]
相关文章:
在Go中迅速使用RabbitMQ
文章目录 1 认识1.1 MQ分类1.2 安装1.3 基本流程 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)3 交换机3.1 fanout3.2 direct3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go) 4 Golang创建交换机/队列/Publish/Consume/B…...
Windows JDK安装详细教程
一、关于JDK 1.1 简介 Java是一种广泛使用的计算机编程语言,拥有跨平台、面向对象、泛型编程的特性,广泛应用于企业级Web应用开发和移动应用开发。 JDK(Java Development Kit)是用于开发 Java 应用程序的工具包。它由以下几个主要…...
Ribbon负载均衡底层原理
springcloude服务实例与服务实例之间发送请求,首先根据服务名注册到nacos,然后发送请求,nacos可以根据服务名找到对应的服务实例。 SpringCloudRibbon的底层采用了一个拦截器,拦截了openfeign发出的请求,对地址做了修…...
【C语言可变参数函数的使用与原理分析】
文章目录 1 前言2 实例2.1实例程序2.2程序执行结果2.3 程序分析 3 补充4 总结 1 前言 在编程过程中,有时会遇到需要定义参数数量不固定的函数的情况。 C语言提供了一种灵活的解决方案:变参函数。这种函数能够根据实际调用时的需求,接受任意…...
【笔记】Java EE应用开发环境配置(JDK+Maven+Tomcat+MySQL+IDEA)
一、安装JDK17 1.下载JDK17 https://download.oracle.com/java/17/archive/jdk-17.0.7_windows-x64_bin.zip 2.配置环境变量 下载后,解压到本地(目录中最好不要有中文或特殊字符) 打开【控制面板】-【系统和安全】-【系统】-【高级系统…...
一文讲懂扩散模型
一文讲懂扩散模型 扩散模型(Diffusion Models, DM)是近年来在计算机视觉、自然语言处理等领域取得显著进展的一种生成模型。其思想根源可以追溯到非平衡热力学,通过模拟数据的扩散和去噪过程来生成新的样本。以下将详细阐述扩散模型的基本原理…...
学习笔记八:基于Jenkins+k8s+Git+DockerHub等技术链构建企业级DevOps容器云平台
基于Jenkinsk8sGitDockerHub等技术链构建企业级DevOps容器云平台 测试jenkins的CI/CD在Jenkins中安装kubernetes插件安装blueocean插件配置jenkins连接到我们存在的k8s集群配置pod-template添加自己的dockerhub凭据测试通过Jenkins部署应用发布到k8s开发环境、测试环境、生产环…...
科研绘图系列:R语言柱状图分布(histogram plot)
文章目录 介绍加载R包读取数据画图介绍 柱状图(Bar Chart)是一种常用的数据可视化图表,用于展示和比较不同类别或组的数据。它通过在二维平面上绘制一系列垂直或水平的柱子来表示数据的大小,每个柱子的长度或高度代表一个数据点的数值。柱状图非常适合于展示分类数据的分布…...
vue3+ts封装类似于微信消息的组件
组件代码如下: <template><div:class"[voice-message, { sent: isSent, received: !isSent }]":style"{ backgroundColor: backgroundColor }"click"togglePlayback"><!-- isSent为false在左侧,为true在右…...
ES6 reduce方法详解:示例、应用场景与实用技巧
在JavaScript中,reduce 方法是一个非常强大的数组方法,它允许你将数组中的元素归并(reduce)为单个值。reduce 方法执行一个由你提供的reducer函数(归并函数),将其结果汇总为单一的返回值。 一.…...
java后端保存的本地图片通过ip+端口直接访问
直接上代码吧 package com.ydx.emms.datapro.controller;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.…...
2024 年高教社杯全国大学生数学建模竞赛B题4小问解题思路(第二版)
原文链接:https://www.cnblogs.com/qimoxuan/articles/18399415 问题 1:抽样检测方案设计 详细解题思路: 确定抽样检测目标:企业需要确定一个可接受的次品率上限(标称值),以及在该次品率下&am…...
docker-nginx数据卷挂载
一、案例1-利用Nginx容器部署静态资源 1.1、需求: 创建Nginx容器, 修改nginx容器内的html目录下的index.html文件,查看变化将静态资源部署到nginx的html目录 1.2、修改html目录下的index.html文件,查看变化 因为docker运用得最小化系统环境,解决办法就…...
项目实战 ---- 商用落地视频搜索系统(8)---优化(2)---查询逻辑层优化
目录 背景 技术衡量与方案 一种可实现方案 可实现方案及设计描述 可能存在的问题 一种创新实现方案 方案的改良设计 策略公式 优化的实现 完整代码 代码解释 异常场景的考量 处理方式 运行注意事项 运行结果 结果优化对比与解释 背景 在项目实战 ---- 商用落地…...
山东大学机试试题合集
🍰🍰🍰高分篇已经涵盖了绝大多数的机试考点,由于临近预推免,各校的机试蜂拥而至,我们接下来先更一些各高校机试题合集,算是对前边学习成果的深入学习,也是对我们代码能力的锻炼。加油…...
餐厅食品留样管理系统小程序的设计
管理员账户功能包括:系统首页,个人中心,窗口负责人管理,窗口员工管理,冰柜管理,排班信息管理,留样食品管理,教育宣传管理,系统管理 微信端账号功能包括:系统…...
亚马逊运营:如何提高亚马逊销量和运营效率?
不少亚马逊卖家们为了扩大业务规模和提高销量,会创建多个卖家账户来同时运营多个亚马逊店铺。问题是,这种多店铺运营模式并非没有风险——亚马逊运营的一个重要方面就是账户的健康管理。一旦某个账户出现问题,亚马逊的算法就可能会启动关联检…...
设计模式背后的设计原则和思想
设计模式背后的设计原则和思想是一套指导我们如何设计高质量软件系统的准则和方法论。这些原则和思想不仅有助于提升软件的可维护性、可扩展性和可复用性,还能帮助开发团队更好地应对复杂多变的需求。以下是一些核心的设计原则和思想: 1. 设计原则 设计…...
项目总体框架
一.后端(包装servlet) 使用BaseServlet进行请求的初步处理(利用继承进行执行这个) 在BaseServlet中 处理请求的类型找到对象的方法,并使用注解找到参数名,执行参数自动注入。 package com.csdn.controlle…...
k8s Prometheus
一、部署 Prometheus kubectl create ns kube-ops# 创建 prometheus-cm.yaml apiVersion: v1 kind: ConfigMap metadata:name: prometheus-confignamespace: kube-ops data:prometheus.yml: |global:scrape_interval: 15s # 表示 prometheus 抓取指标数据的频率,默…...
手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例
文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...
comfyui 工作流中 图生视频 如何增加视频的长度到5秒
comfyUI 工作流怎么可以生成更长的视频。除了硬件显存要求之外还有别的方法吗? 在ComfyUI中实现图生视频并延长到5秒,需要结合多个扩展和技巧。以下是完整解决方案: 核心工作流配置(24fps下5秒120帧) #mermaid-svg-yP…...
保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!
目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...
