当前位置: 首页 > news >正文

Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct",     // Exchange typefalse,        // Durablefalse,        // Auto-deletedfalse,        // Internalfalse,        // No-waitnil,          // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false,     // Durablefalse,     // Delete when unusedfalse,     // Exclusivefalse,     // No-waitamqp.Table{"x-dead-letter-routing-key": "dead",          // routing-key"x-dead-letter-exchange":    "dead_exchange", // 死信交换机"x-message-ttl":             messageTTL,      // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID,       // Routing keyfalse,        // Mandatoryfalse,        // Immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct",        // Exchange typetrue,            // Durablefalse,           // Auto-deletedfalse,           // Internalfalse,           // No-waitnil,             // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"",           // Consumerfalse,        // Auto-acknowledgefalse,        // Exclusivefalse,        // No-localfalse,        // No-waitnil,          // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"",        // Consumertrue,      // Auto-acknowledgefalse,     // Exclusivefalse,     // No-localfalse,     // No-waitnil,       // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
在这里插入图片描述

相关文章:

Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目&#xff08;类似百度网盘&#xff09;&#xff0c;这个网盘项目有一个功能描述如下&#xff1a;用户会删除一个文件到垃圾回收站&#xff0c;回收站的文件有一个时间期限&#xff0c;比如24h&#xff0c;24h后数据库中记录和oss中文件会被删除…...

Apache Spark远程代码执行漏洞(CVE-2023-32007)漏洞复现

漏洞描述 Apache Spark是美国阿帕奇&#xff08;Apache&#xff09;基金会的一款支持非循环数据流和内存计算的大规模数据处理引擎。 Apache Spark 3.4.0之前版本存在命令注入漏洞&#xff0c;该漏洞源于如果ACL启用后&#xff0c;HttpSecurityFilter中的代码路径可以允许通过…...

春秋云镜 :CVE-2020-21650(MyuCMS后台rce)

一、题目 靶标介绍&#xff1a; MyuCMS开源内容管理系统,采用ThinkPHP开发而成的社区商城聚合&#xff0c;插件&#xff0c;模板&#xff0c;轻便快捷容易扩展 其2.2版本中admin.php/config/add方法存在任意命令执行漏洞. 进入题目&#xff1a; exp&#xff1a; url/index.p…...

测试框架pytest教程(7)实现 xunit 风格的setup

pytest支持setup和teardown&#xff0c;对于使用unittest和nose框架的用户来说对这些很熟悉&#xff0c;但是在pytest可以使用功能更强大的fixture来实现固定装置。 模块级别 如果单个模块中有多个测试函数和测试类&#xff0c;您可以选择实现以下固定方法&#xff0c;这些方…...

用队列实现栈

目录 题目题目要求示例 解答方法一、实现思路时间复杂度和空间复杂度代码 方法二、实现思路时间复杂度和空间复杂度代码 方法三、实现思路时间复杂度和空间复杂度代码 总结 题目 用队列实现栈 题目要求 题目链接 示例 解答 方法一、 使用两个队列来实现栈。 实现思路 题…...

Anolis 8.6 下 Redis 7.2.0 集群搭建和配置

Redis 7.2.0 搭建和集群配置 一.Redis 下载与单机部署1.Redis 下载2.虚拟机配置3.Redis 单机源码安装和测试4.Java 单机连接测试1.Pom 依赖2.配置文件3.启动类4.配置类5.单元测试6.测试结果 二.Redis 集群部署1.主从1.从节点配置2.Java 测试 2.哨兵1.哨兵节点配置2.复制一个哨兵…...

综合能源系统(8)——综合能源系统支撑技术

综合能源系统关键技术与典型案例  何泽家&#xff0c;李德智主编 1、大数据技术 1.1、大数据技术概述 大数据是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合&#xff0c;是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高…...

MySQL5.7数据目录结构

以CentOS7为例&#xff0c;数据目录为/var/lib/mysql/&#xff0c;其内容如下&#xff1a; [rootscentos szc]# ll /var/lib/mysql/ total 122952 -rw-r----- 1 mysql mysql 56 Jan 15 16:02 auto.cnf -rw------- 1 mysql mysql 1680 Jan 15 16:02 ca-key.pem -rw-r…...

Python Opencv实践 - 图像直方图均衡化

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape)#图像直方图计算 #cv.calcHist(images, channels, mask, histSize, ranges, hist, accumulate) #images&…...

GAN:对抗生成网络,前向传播和后巷传播的区别

目录 GAN&#xff1a;对抗生成网络 损失函数 判别器开始波动很大&#xff0c;先调整判别器 生成样本和真实样本的统一&#xff1a;真假难辨​编辑 文字专图片​编辑 头像转表情包​编辑 头像转3D​编辑 后向传播 1. 前向传播&#xff08;forward&#xff09; 2. 反向传播&…...

压力变送器的功能与应用

压力变送器是用于测量气体或者液体等介质压力的设备&#xff0c;能够将压力转化为4 G信号传输到监控平台&#xff0c;工作人员可以在电脑或者手机上登录平台查看监测到的数据&#xff0c;并根据数据制定下一步的计划。 压力变送器的功能&#xff1a; 压力变送器采用了高性能感…...

排序算法:选择排序

选择排序的思想是&#xff1a;双重循环遍历数组&#xff0c;每经过一轮比较&#xff0c;找到最小元素的下标&#xff0c;将其交换至首位。 public static void selectionSort(int[] arr) {int minIndex;for (int i 0; i < arr.length - 1; i) {minIndex i;for (int j i …...

Windows运行Spark所需的Hadoop安装

解压文件 复制bin目录 找到winutils-master文件hadoop对应的bin目录版本 全部复制替换掉hadoop的bin目录文件 复制hadoop.dll文件 将bin目录下的hadoop.dll文件复制到System32目录下 配置环境变量 修改hadoop-env.cmd配置文件 注意jdk装在非C盘则完全没问题&#xff0c;如果装在…...

KusionStack使用文档

下载安装 1. 安装 Kusionup 如果想自定义默认安装版本&#xff0c;可以运行下述命令&#xff08;将最后的 openlatest 替换为你想要默认安装的版本号就就行&#xff09;&#xff1a; curl -s "http://kusion-public.oss-cn-hzfinance.aliyuncs.com/cli/kusionup/script…...

ONLYOFFICE 文档如何与 Alfresco 进行集成

ONLYOFFICE 文档是一款开源办公套件&#xff0c;其是包含文本文档、电子表格、演示文稿、数字表单、PDF 查看器和转换工具的协作性编辑工具。要在 Alfresco 中使用 ONLYOFFICE 协作功能&#xff0c;可以将他们连接集成。阅读本文&#xff0c;了解这如何实现。 关于 ONLYOFFICE…...

PostgreSQL下载路径与安装步骤

PgSQL介绍 PgSQL和MySQL一样是一种关系模型的数据库&#xff0c;全称为PostgreSQL 数据库。 优势&#xff1a;PgSQL是一种可扩展、可靠、可定制的数据库管理系统&#xff0c;具有良好的数据完整性和安全性&#xff0c;支持多种操作系统&#xff0c;包括 Linux、Windows、MacOS …...

如何在PHP中编写条件语句

引言 决策是生活不可缺少的一部分。从平凡的着装决定&#xff0c;到改变人生的工作和家庭决定。在开发中也是如此。要让程序做任何有用的事情&#xff0c;它必须能够对某种输入做出响应。当用户点击网站上的联系人按钮时&#xff0c;他们希望被带到联系人页面。如果什么都没有…...

LLM架构自注意力机制Transformers architecture Attention is all you need

使用Transformers架构构建大型语言模型显著提高了自然语言任务的性能&#xff0c;超过了之前的RNNs&#xff0c;并导致了再生能力的爆炸。 Transformers架构的力量在于其学习句子中所有单词的相关性和上下文的能力。不仅仅是您在这里看到的&#xff0c;与它的邻居每个词相邻&…...

计算机网络 QA

DNS 的解析过程 浏览器缓存。当用户通过浏览器访问某域名时&#xff0c;浏览器首先会在自己的缓存中查找是否有该域名对应的 IP 地址&#xff08;曾经访问过该域名并且没有清空缓存&#xff09;系统缓存。当浏览器缓存中无域名对应的 IP 地址时&#xff0c;会自动检测用户计算机…...

安果天气预报 产品介绍

软件介绍版本号 2.0.5 安果天气预报&#xff1a;全世界覆盖&#xff0c;中国定制 想要查找北京、上海、纽约、东京还是巴黎的天气&#xff1f;一款简约的天气预 报应用为你呈现。专注于为用户提供纯净的天气体验&#xff0c;我们不发送任何打扰的通知。包含空气质量、能见度、…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展&#xff0c;AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术&#xff0c;在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...