当前位置: 首页 > news >正文

typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); //10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);

相关文章:

typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ 安装 sudo pacman -S rabbitmq rabbitmqadmin启动管理模块 sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限&#xf…...

怎样才能把重建大师的空三导进去CC?

导出空三文件xml两者都是通用的&#xff0c;cc和photoscan都可以兼容。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件&#xff0c;输入倾斜照片&#xff0c;激光点云&#xff0c;POS信息及像控点&#xff0c;输出高精度彩色网格模型&#xff0c;可一键…...

命令模式(请求与具体实现解耦)

目录 前言 UML plantuml 类图 实战代码 模板 Command Invoker Receiver Client 前言 命令模式解耦了命令请求者&#xff08;Invoker&#xff09;和命令执行者&#xff08;receiver&#xff09;&#xff0c;使得 Invoker 不再直接引用 receiver&#xff0c;而是依赖于…...

开发一款MMOARPG难度到底有多大

开发一款MMOARPG难度到底有多大 MMORPG游戏开发到底有多难&#xff0c;我们按照过去开发的标准&#xff0c;就比如开发一款传奇&#xff0c;那时候哪会用什么别人的引擎&#xff0c;都是自研&#xff0c;从基础图形API开始。我们不考虑美术和策划&#xff0c;就单指程序&#x…...

RTSP应用:实现视频流的实时推送

在实现实时视频流推送的项目中&#xff0c;RTSP&#xff08;Real Time Streaming Protocol&#xff09;协议扮演着核心角色。本文将指导你通过安装FFmpeg软件&#xff0c;下载并编译live555&#xff0c;以及配置ffmpeg进行视频流推送&#xff0c;来实现一个基本的RTSP流媒体服务…...

Java八股文(数据结构)

Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念&#xff1a;链表、栈、队列和树。 链表是一种线性数据结构&#xff0c;由节点组成&#xff0c;每个节点包含了指向下一个节点的指针&#xff1b; 栈是一种后进先出&#xff08;LIFO&#xff09;的数据结构&a…...

ActiveMQ Artemis 系列| High Availability 主备模式(消息复制) 版本2.19.1

一、ActiveMQ Artemis 介绍 Apache ActiveMQ Artemis 是一个高性能的开源消息代理&#xff0c;它完全符合 Java Message Service (JMS) 2.0 规范&#xff0c;并支持多种通信协议&#xff0c;包括 AMQP、MQTT、STOMP 和 OpenWire 等。ActiveMQ Artemis 由 Apache Software Foun…...

QGIS插件系列--WhiteBox Tools

WhiteBox Tools&#xff08;官网机翻&#xff09;: WhiteboxTools是由圭尔夫大学地貌测量和水文地理信息学研究小组&#xff08;GHRG&#xff09;开发的高级地理空间软件包和数据分析平台。该项目始于2017年<>月&#xff0c;并在分析能力方面迅速发展。WhiteboxTools的一…...

SpringMVC设置全局异常处理器

文章目录 背景分析使用ControllerAdvice&#xff08;RestControllerAdvice&#xff09;ExceptionHandler实现全局异常全局异常处理-多个处理器匹配顺序存在一个类中存在不同的类中 对于过滤器和拦截器中的异常&#xff0c;有两种思路可以考虑 背景 在项目中我们有需求做一个全…...

Acwing_795前缀和 【一维前缀和】+【模板】二维前缀和

Acwing_795前缀和 【一维前缀和】 题目&#xff1a; 代码&#xff1a; #include <bits/stdc.h> #define int long long #define INF 0X3f3f3f3f #define endl \n using namespace std; const int N 100010; int arr[N];int n,m; int l,r; signed main(){std::ios::s…...

docker 部署 gitlab-ce 16.9.1

文章目录 [toc]拉取 gitlab-ce 镜像创建 gitlab-ce 持久化目录启停脚本配置配置 gitlab-ce编辑 gitlab-ce 配置文件重启 gitlab-ce配置 root 密码 设置中文 gitlab/gitlab-ce(需要科学上网) 拉取 gitlab-ce 镜像 docker pull gitlab/gitlab-ce:16.9.1-ce.0查看镜像是不是有 Vo…...

29.Python从入门到精通—Python3 面向对象继承 多继承 方法重写 类属性与方法

29.从入门到精通&#xff1a;Python3 面向对象继承 多继承 方法重写 类属性与方法 继承多继承方法重写类属性与方法 继承 在面向对象编程中&#xff0c;继承是指通过继承现有类的属性和方法来创建新类的过程。新类称为子类&#xff08;或派生类&#xff09;&#xff0c;现有类…...

jQuery如何获取元素宽高?

在jQuery中&#xff0c;获取元素的宽和高有多种方法&#xff0c;取决于你是否需要包括边框、内边距或其他额外空间。以下是几种常用的方式&#xff1a; 获取元素内容区域的宽和高&#xff08;不包括边框和内边距&#xff09;&#xff1a; var width $(#yourElement).width(); …...

springdata框架对es集成

什么是spring data框架 Spring Data是一个用于简化数据库、非关系型数据库、索引库访问&#xff0c;并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷&#xff0c;并支持 map-reduce框架和云计算数据服务。Spring Data可以极大的简化JPA(Elasticsearch…)的…...

jvm(虚拟机)运行时数据区域介绍

Java虚拟机&#xff08;JVM&#xff09;运行时数据区域是Java程序在运行过程中使用的内存区域&#xff0c;它主要包括以下几个部分&#xff1a; 程序计数器&#xff08;Program Counter Register&#xff09;&#xff1a; 程序计数器是一块较小的内存区域&#xff0c;是线程私有…...

C++ MFC 只启动一个程序实例 唤醒之前的实例(完整源码)

初级代码游戏的专栏介绍与文章目录-CSDN博客 很多时候我们希望只允许启动一个程序实例&#xff0c;如果再次运行&#xff0c;就唤醒之前的实例。 目录 1 概述 2 相关技术介绍 2.1 互斥对象 2.2 查找窗口 2.3 唤醒窗口 1 概述 技术上并不难&#xff0c;涉及到以下几个技术…...

2024多云管理平台CMP排名看这里!

随着云计算技术的迅猛发展&#xff0c;多云管理平台CMP应运而生。多云管理平台CMP仅能够简化对多个云环境的统一管理&#xff0c;还能提高资源利用效率和降低成本。因此了解多云管理平台CMP品牌是必要的。2024多云管理平台CMP排名看这里&#xff01;仅供参考哈&#xff01; 20…...

MySQL 数据库的日志管理、备份与恢复

一. 数据库备份 1.数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中&#xff0c;数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因&#xff1a; 程序错误人为,操作错误,运算错误,磁盘故障灾难&#xff08;如火灾、地震&#xff0…...

一、Go开发环境搭建

文章目录 1、开发工具2、开发环境配置3、Hello World4、语法 1、开发工具 https://code.visualstudio.com/download 2、开发环境配置 类比Java的JDK&#xff0c;go的SDK下载&#xff1a;https://studygolang.com/dl 解压&#xff1a; 配置环境变量path&#xff0c;将命令&quo…...

包子凑数(蓝桥杯,闫氏DP分析法)

题目描述&#xff1a; 小明几乎每天早晨都会在一家包子铺吃早餐。 他发现这家包子铺有 N 种蒸笼&#xff0c;其中第 i 种蒸笼恰好能放 Ai 个包子。 每种蒸笼都有非常多笼&#xff0c;可以认为是无限笼。 每当有顾客想买 X 个包子&#xff0c;卖包子的大叔就会迅速选出若干笼…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

Xen Server服务器释放磁盘空间

disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)

macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 &#x1f37a; 最新版brew安装慢到怀疑人生&#xff1f;别怕&#xff0c;教你轻松起飞&#xff01; 最近Homebrew更新至最新版&#xff0c;每次执行 brew 命令时都会自动从官方地址 https://formulae.…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...