Golang使用消息队列(RabbitMQ)
最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。
使用RabbitMQ的架构

代码
因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节
- 注意交换机和队列的绑定,一定要细心
- 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
- 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct", // Exchange typefalse, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false, // Durablefalse, // Delete when unusedfalse, // Exclusivefalse, // No-waitamqp.Table{"x-dead-letter-routing-key": "dead", // routing-key"x-dead-letter-exchange": "dead_exchange", // 死信交换机"x-message-ttl": messageTTL, // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID, // Routing keyfalse, // Mandatoryfalse, // Immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct", // Exchange typetrue, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"", // Consumerfalse, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"", // Consumertrue, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}
从dead_exchange中消费:

相关文章:
Golang使用消息队列(RabbitMQ)
最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除…...
Apache Spark远程代码执行漏洞(CVE-2023-32007)漏洞复现
漏洞描述 Apache Spark是美国阿帕奇(Apache)基金会的一款支持非循环数据流和内存计算的大规模数据处理引擎。 Apache Spark 3.4.0之前版本存在命令注入漏洞,该漏洞源于如果ACL启用后,HttpSecurityFilter中的代码路径可以允许通过…...
春秋云镜 :CVE-2020-21650(MyuCMS后台rce)
一、题目 靶标介绍: MyuCMS开源内容管理系统,采用ThinkPHP开发而成的社区商城聚合,插件,模板,轻便快捷容易扩展 其2.2版本中admin.php/config/add方法存在任意命令执行漏洞. 进入题目: exp: url/index.p…...
测试框架pytest教程(7)实现 xunit 风格的setup
pytest支持setup和teardown,对于使用unittest和nose框架的用户来说对这些很熟悉,但是在pytest可以使用功能更强大的fixture来实现固定装置。 模块级别 如果单个模块中有多个测试函数和测试类,您可以选择实现以下固定方法,这些方…...
用队列实现栈
目录 题目题目要求示例 解答方法一、实现思路时间复杂度和空间复杂度代码 方法二、实现思路时间复杂度和空间复杂度代码 方法三、实现思路时间复杂度和空间复杂度代码 总结 题目 用队列实现栈 题目要求 题目链接 示例 解答 方法一、 使用两个队列来实现栈。 实现思路 题…...
Anolis 8.6 下 Redis 7.2.0 集群搭建和配置
Redis 7.2.0 搭建和集群配置 一.Redis 下载与单机部署1.Redis 下载2.虚拟机配置3.Redis 单机源码安装和测试4.Java 单机连接测试1.Pom 依赖2.配置文件3.启动类4.配置类5.单元测试6.测试结果 二.Redis 集群部署1.主从1.从节点配置2.Java 测试 2.哨兵1.哨兵节点配置2.复制一个哨兵…...
综合能源系统(8)——综合能源系统支撑技术
综合能源系统关键技术与典型案例 何泽家,李德智主编 1、大数据技术 1.1、大数据技术概述 大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高…...
MySQL5.7数据目录结构
以CentOS7为例,数据目录为/var/lib/mysql/,其内容如下: [rootscentos szc]# ll /var/lib/mysql/ total 122952 -rw-r----- 1 mysql mysql 56 Jan 15 16:02 auto.cnf -rw------- 1 mysql mysql 1680 Jan 15 16:02 ca-key.pem -rw-r…...
Python Opencv实践 - 图像直方图均衡化
import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape)#图像直方图计算 #cv.calcHist(images, channels, mask, histSize, ranges, hist, accumulate) #images&…...
GAN:对抗生成网络,前向传播和后巷传播的区别
目录 GAN:对抗生成网络 损失函数 判别器开始波动很大,先调整判别器 生成样本和真实样本的统一:真假难辨编辑 文字专图片编辑 头像转表情包编辑 头像转3D编辑 后向传播 1. 前向传播(forward) 2. 反向传播&…...
压力变送器的功能与应用
压力变送器是用于测量气体或者液体等介质压力的设备,能够将压力转化为4 G信号传输到监控平台,工作人员可以在电脑或者手机上登录平台查看监测到的数据,并根据数据制定下一步的计划。 压力变送器的功能: 压力变送器采用了高性能感…...
排序算法:选择排序
选择排序的思想是:双重循环遍历数组,每经过一轮比较,找到最小元素的下标,将其交换至首位。 public static void selectionSort(int[] arr) {int minIndex;for (int i 0; i < arr.length - 1; i) {minIndex i;for (int j i …...
Windows运行Spark所需的Hadoop安装
解压文件 复制bin目录 找到winutils-master文件hadoop对应的bin目录版本 全部复制替换掉hadoop的bin目录文件 复制hadoop.dll文件 将bin目录下的hadoop.dll文件复制到System32目录下 配置环境变量 修改hadoop-env.cmd配置文件 注意jdk装在非C盘则完全没问题,如果装在…...
KusionStack使用文档
下载安装 1. 安装 Kusionup 如果想自定义默认安装版本,可以运行下述命令(将最后的 openlatest 替换为你想要默认安装的版本号就就行): curl -s "http://kusion-public.oss-cn-hzfinance.aliyuncs.com/cli/kusionup/script…...
ONLYOFFICE 文档如何与 Alfresco 进行集成
ONLYOFFICE 文档是一款开源办公套件,其是包含文本文档、电子表格、演示文稿、数字表单、PDF 查看器和转换工具的协作性编辑工具。要在 Alfresco 中使用 ONLYOFFICE 协作功能,可以将他们连接集成。阅读本文,了解这如何实现。 关于 ONLYOFFICE…...
PostgreSQL下载路径与安装步骤
PgSQL介绍 PgSQL和MySQL一样是一种关系模型的数据库,全称为PostgreSQL 数据库。 优势:PgSQL是一种可扩展、可靠、可定制的数据库管理系统,具有良好的数据完整性和安全性,支持多种操作系统,包括 Linux、Windows、MacOS …...
如何在PHP中编写条件语句
引言 决策是生活不可缺少的一部分。从平凡的着装决定,到改变人生的工作和家庭决定。在开发中也是如此。要让程序做任何有用的事情,它必须能够对某种输入做出响应。当用户点击网站上的联系人按钮时,他们希望被带到联系人页面。如果什么都没有…...
LLM架构自注意力机制Transformers architecture Attention is all you need
使用Transformers架构构建大型语言模型显著提高了自然语言任务的性能,超过了之前的RNNs,并导致了再生能力的爆炸。 Transformers架构的力量在于其学习句子中所有单词的相关性和上下文的能力。不仅仅是您在这里看到的,与它的邻居每个词相邻&…...
计算机网络 QA
DNS 的解析过程 浏览器缓存。当用户通过浏览器访问某域名时,浏览器首先会在自己的缓存中查找是否有该域名对应的 IP 地址(曾经访问过该域名并且没有清空缓存)系统缓存。当浏览器缓存中无域名对应的 IP 地址时,会自动检测用户计算机…...
安果天气预报 产品介绍
软件介绍版本号 2.0.5 安果天气预报:全世界覆盖,中国定制 想要查找北京、上海、纽约、东京还是巴黎的天气?一款简约的天气预 报应用为你呈现。专注于为用户提供纯净的天气体验,我们不发送任何打扰的通知。包含空气质量、能见度、…...
红外图像/红外遥感图像/可见光红外图像对 近红外和可见光成对图像 生成对抗网络的风格迁移,或者图像融合/图像生成/图像转换 可见光遥感生成红外遥感图像,37500对图像数据
红外图像/红外遥感图像/可见光红外图像对 近红外和可见光成对图像 生成对抗网络的风格迁移,或者图像融合/图像生成/图像转换 可见光遥感生成红外遥感图像,37500对图像数据 文章目录**数据集描述:**🧾 项目背景🧰 一、环…...
初次使用 Taotoken 控制台的快速浏览与核心功能导引
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 初次使用 Taotoken 控制台的快速浏览与核心功能导引 当你注册并登录 Taotoken 平台后,首先进入的就是用户控制台。这个…...
基于CircuitPython与NeoPixel的智能圣诞树:从硬件搭建到动态灯光算法
1. 项目概述:从零打造一棵会“思考”的圣诞树又到年底了,看着家里那棵年复一年、只会默默发光的传统圣诞树,总觉得少了点“灵魂”。作为一个常年和微控制器、代码打交道的创客,我总琢磨着能不能给节日装饰加点科技感,让…...
FreeRTOS+LwIP 2.2.0实战:tcpip_thread消息队列与定时器如何协同工作?
FreeRTOS与LwIP 2.2.0深度协同:消息队列与定时器的精妙舞步 在嵌入式网络开发中,实时操作系统与轻量级TCP/IP协议栈的协同工作一直是开发者关注的焦点。FreeRTOS作为嵌入式领域广泛使用的实时操作系统,与LwIP这一轻量级TCP/IP协议栈的组合&am…...
从模型到代码:无人驾驶轨迹跟踪算法(Stanley、LQR、PID)的Carsim/Simulink联合仿真实践
1. 无人驾驶轨迹跟踪算法入门指南 第一次接触无人驾驶轨迹跟踪算法时,我被各种专业术语搞得晕头转向。直到真正动手在Carsim和Simulink里搭建仿真环境,才明白这些算法到底是怎么运作的。轨迹跟踪算法的核心任务很简单:让车辆按照预定路线行驶…...
不止图表引用!VSCode+LaTeX完整编译链配置指南(含BibTeX文献处理)
VSCodeLaTeX高效工作流:从交叉引用到文献管理的全栈配置指南 当你第一次在VSCode中尝试用LaTeX撰写学术论文时,是否曾被那些顽固的"??"标记困扰?这些问号背后隐藏着LaTeX编译机制的核心逻辑——交叉引用需要多轮编译才能正确解析…...
推理服务为什么一上模型压缩组合就开始精度雪崩:从量化-剪枝-蒸馏的叠加效应到恢复策略的工程实战
一、精度雪崩的生产现场 🔥 某团队部署 LLaMA-2-7B 推理服务时,为降低显存、提升吞吐,同时对模型做 W4A16 量化、30% 结构化剪枝与层蒸馏。单独测试时,量化版困惑度上升 8%,剪枝版上升 12%,蒸馏版上升 15%。…...
基于金橙子MarkEzd.dll的激光打标二次开发实战:从函数解析到自动化标刻系统构建
1. 金橙子MarkEzd.dll开发入门指南 第一次接触激光打标二次开发的朋友可能会被各种专业术语吓到,但其实只要掌握几个核心概念就能快速上手。MarkEzd.dll是北京金橙子科技为EZCAD2激光打标软件提供的开发接口,相当于给开发者开了一个"后门"&…...
深度神经网络(DNN)百科全书从“深“到“无限深“
一、开篇:深度的奇迹 2012 年 9 月 30 日。 ImageNet 挑战赛的结果在 Florence 公布。所有人都以为冠军会延续过去 3 年的传统——传统计算机视觉方法(SIFT、HOG、SVM)小幅领先。 但那一年,一个叫 AlexNet 的"怪物"出现了。8 层的卷积神经网络,Top-5 错误率 …...
本地大模型部署的Python“翻译官“:llama-cpp-python深度解析
本地大模型部署的Python"翻译官":llama-cpp-python深度解析 【免费下载链接】llama-cpp-python Python bindings for llama.cpp 项目地址: https://gitcode.com/gh_mirrors/ll/llama-cpp-python 你是否曾为云端API的延迟而焦虑?是否担心…...
