Golang使用消息队列(RabbitMQ)
最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。
使用RabbitMQ的架构

代码
因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节
- 注意交换机和队列的绑定,一定要细心
- 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
- 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct", // Exchange typefalse, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false, // Durablefalse, // Delete when unusedfalse, // Exclusivefalse, // No-waitamqp.Table{"x-dead-letter-routing-key": "dead", // routing-key"x-dead-letter-exchange": "dead_exchange", // 死信交换机"x-message-ttl": messageTTL, // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID, // Routing keyfalse, // Mandatoryfalse, // Immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct", // Exchange typetrue, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"", // Consumerfalse, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"", // Consumertrue, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}
从dead_exchange中消费:

相关文章:
Golang使用消息队列(RabbitMQ)
最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除…...
Apache Spark远程代码执行漏洞(CVE-2023-32007)漏洞复现
漏洞描述 Apache Spark是美国阿帕奇(Apache)基金会的一款支持非循环数据流和内存计算的大规模数据处理引擎。 Apache Spark 3.4.0之前版本存在命令注入漏洞,该漏洞源于如果ACL启用后,HttpSecurityFilter中的代码路径可以允许通过…...
春秋云镜 :CVE-2020-21650(MyuCMS后台rce)
一、题目 靶标介绍: MyuCMS开源内容管理系统,采用ThinkPHP开发而成的社区商城聚合,插件,模板,轻便快捷容易扩展 其2.2版本中admin.php/config/add方法存在任意命令执行漏洞. 进入题目: exp: url/index.p…...
测试框架pytest教程(7)实现 xunit 风格的setup
pytest支持setup和teardown,对于使用unittest和nose框架的用户来说对这些很熟悉,但是在pytest可以使用功能更强大的fixture来实现固定装置。 模块级别 如果单个模块中有多个测试函数和测试类,您可以选择实现以下固定方法,这些方…...
用队列实现栈
目录 题目题目要求示例 解答方法一、实现思路时间复杂度和空间复杂度代码 方法二、实现思路时间复杂度和空间复杂度代码 方法三、实现思路时间复杂度和空间复杂度代码 总结 题目 用队列实现栈 题目要求 题目链接 示例 解答 方法一、 使用两个队列来实现栈。 实现思路 题…...
Anolis 8.6 下 Redis 7.2.0 集群搭建和配置
Redis 7.2.0 搭建和集群配置 一.Redis 下载与单机部署1.Redis 下载2.虚拟机配置3.Redis 单机源码安装和测试4.Java 单机连接测试1.Pom 依赖2.配置文件3.启动类4.配置类5.单元测试6.测试结果 二.Redis 集群部署1.主从1.从节点配置2.Java 测试 2.哨兵1.哨兵节点配置2.复制一个哨兵…...
综合能源系统(8)——综合能源系统支撑技术
综合能源系统关键技术与典型案例 何泽家,李德智主编 1、大数据技术 1.1、大数据技术概述 大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高…...
MySQL5.7数据目录结构
以CentOS7为例,数据目录为/var/lib/mysql/,其内容如下: [rootscentos szc]# ll /var/lib/mysql/ total 122952 -rw-r----- 1 mysql mysql 56 Jan 15 16:02 auto.cnf -rw------- 1 mysql mysql 1680 Jan 15 16:02 ca-key.pem -rw-r…...
Python Opencv实践 - 图像直方图均衡化
import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape)#图像直方图计算 #cv.calcHist(images, channels, mask, histSize, ranges, hist, accumulate) #images&…...
GAN:对抗生成网络,前向传播和后巷传播的区别
目录 GAN:对抗生成网络 损失函数 判别器开始波动很大,先调整判别器 生成样本和真实样本的统一:真假难辨编辑 文字专图片编辑 头像转表情包编辑 头像转3D编辑 后向传播 1. 前向传播(forward) 2. 反向传播&…...
压力变送器的功能与应用
压力变送器是用于测量气体或者液体等介质压力的设备,能够将压力转化为4 G信号传输到监控平台,工作人员可以在电脑或者手机上登录平台查看监测到的数据,并根据数据制定下一步的计划。 压力变送器的功能: 压力变送器采用了高性能感…...
排序算法:选择排序
选择排序的思想是:双重循环遍历数组,每经过一轮比较,找到最小元素的下标,将其交换至首位。 public static void selectionSort(int[] arr) {int minIndex;for (int i 0; i < arr.length - 1; i) {minIndex i;for (int j i …...
Windows运行Spark所需的Hadoop安装
解压文件 复制bin目录 找到winutils-master文件hadoop对应的bin目录版本 全部复制替换掉hadoop的bin目录文件 复制hadoop.dll文件 将bin目录下的hadoop.dll文件复制到System32目录下 配置环境变量 修改hadoop-env.cmd配置文件 注意jdk装在非C盘则完全没问题,如果装在…...
KusionStack使用文档
下载安装 1. 安装 Kusionup 如果想自定义默认安装版本,可以运行下述命令(将最后的 openlatest 替换为你想要默认安装的版本号就就行): curl -s "http://kusion-public.oss-cn-hzfinance.aliyuncs.com/cli/kusionup/script…...
ONLYOFFICE 文档如何与 Alfresco 进行集成
ONLYOFFICE 文档是一款开源办公套件,其是包含文本文档、电子表格、演示文稿、数字表单、PDF 查看器和转换工具的协作性编辑工具。要在 Alfresco 中使用 ONLYOFFICE 协作功能,可以将他们连接集成。阅读本文,了解这如何实现。 关于 ONLYOFFICE…...
PostgreSQL下载路径与安装步骤
PgSQL介绍 PgSQL和MySQL一样是一种关系模型的数据库,全称为PostgreSQL 数据库。 优势:PgSQL是一种可扩展、可靠、可定制的数据库管理系统,具有良好的数据完整性和安全性,支持多种操作系统,包括 Linux、Windows、MacOS …...
如何在PHP中编写条件语句
引言 决策是生活不可缺少的一部分。从平凡的着装决定,到改变人生的工作和家庭决定。在开发中也是如此。要让程序做任何有用的事情,它必须能够对某种输入做出响应。当用户点击网站上的联系人按钮时,他们希望被带到联系人页面。如果什么都没有…...
LLM架构自注意力机制Transformers architecture Attention is all you need
使用Transformers架构构建大型语言模型显著提高了自然语言任务的性能,超过了之前的RNNs,并导致了再生能力的爆炸。 Transformers架构的力量在于其学习句子中所有单词的相关性和上下文的能力。不仅仅是您在这里看到的,与它的邻居每个词相邻&…...
计算机网络 QA
DNS 的解析过程 浏览器缓存。当用户通过浏览器访问某域名时,浏览器首先会在自己的缓存中查找是否有该域名对应的 IP 地址(曾经访问过该域名并且没有清空缓存)系统缓存。当浏览器缓存中无域名对应的 IP 地址时,会自动检测用户计算机…...
安果天气预报 产品介绍
软件介绍版本号 2.0.5 安果天气预报:全世界覆盖,中国定制 想要查找北京、上海、纽约、东京还是巴黎的天气?一款简约的天气预 报应用为你呈现。专注于为用户提供纯净的天气体验,我们不发送任何打扰的通知。包含空气质量、能见度、…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
elementUI点击浏览table所选行数据查看文档
项目场景: table按照要求特定的数据变成按钮可以点击 解决方案: <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...
6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础
第三周 Day 3 🎯 今日目标 理解类(class)和对象(object)的关系学会定义类的属性、方法和构造函数(init)掌握对象的创建与使用初识封装、继承和多态的基本概念(预告) &a…...
LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)
在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...
C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
深入解析 ReentrantLock:原理、公平锁与非公平锁的较量
ReentrantLock 是 Java 中 java.util.concurrent.locks 包下的一个重要类,用于实现线程同步,支持可重入性,并且可以选择公平锁或非公平锁的实现方式。下面将详细介绍 ReentrantLock 的实现原理以及公平锁和非公平锁的区别。 ReentrantLock 实现原理 基本架构 ReentrantLo…...
