当前位置: 首页 > news >正文

Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct",     // Exchange typefalse,        // Durablefalse,        // Auto-deletedfalse,        // Internalfalse,        // No-waitnil,          // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false,     // Durablefalse,     // Delete when unusedfalse,     // Exclusivefalse,     // No-waitamqp.Table{"x-dead-letter-routing-key": "dead",          // routing-key"x-dead-letter-exchange":    "dead_exchange", // 死信交换机"x-message-ttl":             messageTTL,      // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID,       // Routing keyfalse,        // Mandatoryfalse,        // Immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct",        // Exchange typetrue,            // Durablefalse,           // Auto-deletedfalse,           // Internalfalse,           // No-waitnil,             // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"",           // Consumerfalse,        // Auto-acknowledgefalse,        // Exclusivefalse,        // No-localfalse,        // No-waitnil,          // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"",        // Consumertrue,      // Auto-acknowledgefalse,     // Exclusivefalse,     // No-localfalse,     // No-waitnil,       // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
在这里插入图片描述

相关文章:

Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目&#xff08;类似百度网盘&#xff09;&#xff0c;这个网盘项目有一个功能描述如下&#xff1a;用户会删除一个文件到垃圾回收站&#xff0c;回收站的文件有一个时间期限&#xff0c;比如24h&#xff0c;24h后数据库中记录和oss中文件会被删除…...

Apache Spark远程代码执行漏洞(CVE-2023-32007)漏洞复现

漏洞描述 Apache Spark是美国阿帕奇&#xff08;Apache&#xff09;基金会的一款支持非循环数据流和内存计算的大规模数据处理引擎。 Apache Spark 3.4.0之前版本存在命令注入漏洞&#xff0c;该漏洞源于如果ACL启用后&#xff0c;HttpSecurityFilter中的代码路径可以允许通过…...

春秋云镜 :CVE-2020-21650(MyuCMS后台rce)

一、题目 靶标介绍&#xff1a; MyuCMS开源内容管理系统,采用ThinkPHP开发而成的社区商城聚合&#xff0c;插件&#xff0c;模板&#xff0c;轻便快捷容易扩展 其2.2版本中admin.php/config/add方法存在任意命令执行漏洞. 进入题目&#xff1a; exp&#xff1a; url/index.p…...

测试框架pytest教程(7)实现 xunit 风格的setup

pytest支持setup和teardown&#xff0c;对于使用unittest和nose框架的用户来说对这些很熟悉&#xff0c;但是在pytest可以使用功能更强大的fixture来实现固定装置。 模块级别 如果单个模块中有多个测试函数和测试类&#xff0c;您可以选择实现以下固定方法&#xff0c;这些方…...

用队列实现栈

目录 题目题目要求示例 解答方法一、实现思路时间复杂度和空间复杂度代码 方法二、实现思路时间复杂度和空间复杂度代码 方法三、实现思路时间复杂度和空间复杂度代码 总结 题目 用队列实现栈 题目要求 题目链接 示例 解答 方法一、 使用两个队列来实现栈。 实现思路 题…...

Anolis 8.6 下 Redis 7.2.0 集群搭建和配置

Redis 7.2.0 搭建和集群配置 一.Redis 下载与单机部署1.Redis 下载2.虚拟机配置3.Redis 单机源码安装和测试4.Java 单机连接测试1.Pom 依赖2.配置文件3.启动类4.配置类5.单元测试6.测试结果 二.Redis 集群部署1.主从1.从节点配置2.Java 测试 2.哨兵1.哨兵节点配置2.复制一个哨兵…...

综合能源系统(8)——综合能源系统支撑技术

综合能源系统关键技术与典型案例  何泽家&#xff0c;李德智主编 1、大数据技术 1.1、大数据技术概述 大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合&#xff0c;是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高…...

MySQL5.7数据目录结构

以CentOS7为例&#xff0c;数据目录为/var/lib/mysql/&#xff0c;其内容如下&#xff1a; [rootscentos szc]# ll /var/lib/mysql/ total 122952 -rw-r----- 1 mysql mysql 56 Jan 15 16:02 auto.cnf -rw------- 1 mysql mysql 1680 Jan 15 16:02 ca-key.pem -rw-r…...

Python Opencv实践 - 图像直方图均衡化

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape)#图像直方图计算 #cv.calcHist(images, channels, mask, histSize, ranges, hist, accumulate) #images&…...

GAN:对抗生成网络,前向传播和后巷传播的区别

目录 GAN&#xff1a;对抗生成网络 损失函数 判别器开始波动很大&#xff0c;先调整判别器 生成样本和真实样本的统一&#xff1a;真假难辨​编辑 文字专图片​编辑 头像转表情包​编辑 头像转3D​编辑 后向传播 1. 前向传播&#xff08;forward&#xff09; 2. 反向传播&…...

压力变送器的功能与应用

压力变送器是用于测量气体或者液体等介质压力的设备&#xff0c;能够将压力转化为4 G信号传输到监控平台&#xff0c;工作人员可以在电脑或者手机上登录平台查看监测到的数据&#xff0c;并根据数据制定下一步的计划。 压力变送器的功能&#xff1a; 压力变送器采用了高性能感…...

排序算法:选择排序

选择排序的思想是&#xff1a;双重循环遍历数组&#xff0c;每经过一轮比较&#xff0c;找到最小元素的下标&#xff0c;将其交换至首位。 public static void selectionSort(int[] arr) {int minIndex;for (int i 0; i < arr.length - 1; i) {minIndex i;for (int j i …...

Windows运行Spark所需的Hadoop安装

解压文件 复制bin目录 找到winutils-master文件hadoop对应的bin目录版本 全部复制替换掉hadoop的bin目录文件 复制hadoop.dll文件 将bin目录下的hadoop.dll文件复制到System32目录下 配置环境变量 修改hadoop-env.cmd配置文件 注意jdk装在非C盘则完全没问题&#xff0c;如果装在…...

KusionStack使用文档

下载安装 1. 安装 Kusionup 如果想自定义默认安装版本&#xff0c;可以运行下述命令&#xff08;将最后的 openlatest 替换为你想要默认安装的版本号就就行&#xff09;&#xff1a; curl -s "http://kusion-public.oss-cn-hzfinance.aliyuncs.com/cli/kusionup/script…...

ONLYOFFICE 文档如何与 Alfresco 进行集成

ONLYOFFICE 文档是一款开源办公套件&#xff0c;其是包含文本文档、电子表格、演示文稿、数字表单、PDF 查看器和转换工具的协作性编辑工具。要在 Alfresco 中使用 ONLYOFFICE 协作功能&#xff0c;可以将他们连接集成。阅读本文&#xff0c;了解这如何实现。 关于 ONLYOFFICE…...

PostgreSQL下载路径与安装步骤

PgSQL介绍 PgSQL和MySQL一样是一种关系模型的数据库&#xff0c;全称为PostgreSQL 数据库。 优势&#xff1a;PgSQL是一种可扩展、可靠、可定制的数据库管理系统&#xff0c;具有良好的数据完整性和安全性&#xff0c;支持多种操作系统&#xff0c;包括 Linux、Windows、MacOS …...

如何在PHP中编写条件语句

引言 决策是生活不可缺少的一部分。从平凡的着装决定&#xff0c;到改变人生的工作和家庭决定。在开发中也是如此。要让程序做任何有用的事情&#xff0c;它必须能够对某种输入做出响应。当用户点击网站上的联系人按钮时&#xff0c;他们希望被带到联系人页面。如果什么都没有…...

LLM架构自注意力机制Transformers architecture Attention is all you need

使用Transformers架构构建大型语言模型显著提高了自然语言任务的性能&#xff0c;超过了之前的RNNs&#xff0c;并导致了再生能力的爆炸。 Transformers架构的力量在于其学习句子中所有单词的相关性和上下文的能力。不仅仅是您在这里看到的&#xff0c;与它的邻居每个词相邻&…...

计算机网络 QA

DNS 的解析过程 浏览器缓存。当用户通过浏览器访问某域名时&#xff0c;浏览器首先会在自己的缓存中查找是否有该域名对应的 IP 地址&#xff08;曾经访问过该域名并且没有清空缓存&#xff09;系统缓存。当浏览器缓存中无域名对应的 IP 地址时&#xff0c;会自动检测用户计算机…...

安果天气预报 产品介绍

软件介绍版本号 2.0.5 安果天气预报&#xff1a;全世界覆盖&#xff0c;中国定制 想要查找北京、上海、纽约、东京还是巴黎的天气&#xff1f;一款简约的天气预 报应用为你呈现。专注于为用户提供纯净的天气体验&#xff0c;我们不发送任何打扰的通知。包含空气质量、能见度、…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...