typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存
Manjaro安装RabbitMQ
安装
sudo pacman -S rabbitmq rabbitmqadmin
启动管理模块
sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server
管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:
- 添加用户:
rabbitmqctl add_user mingcai password
请将 password 替换为您想要设置的实际密码。
- 分配权限:
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"
这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."。
- 可选步骤:设置用户角色:
您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:
rabbitmqctl set_user_tags mingcai administrator
这样,用户 mingcai 就被赋予了管理员权限。
请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。
死信队列


标题:利用RabbitMQ死信队列处理消息的三种情况
在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。
1. 消息被拒绝
当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。
// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});
2. 消息过期
有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。
// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});
3. 队列达到最大长度
为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。
// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});
通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。
以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!
延时队列
什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等



npm install amqplib --save
npm install @types/amqplib --save-dev

总结

rabbitmqadmin 使用入门
rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:
export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672
export RABBITMQ_USER=mingcai
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
- 查看 RabbitMQ 服务器信息:
rabbitmqadmin status
- 列出所有交换机:
rabbitmqadmin list exchanges
- 列出所有队列:
rabbitmqadmin list queues
- 创建一个交换机:
rabbitmqadmin declare exchange name=my_exchange type=direct
- 创建一个队列:
rabbitmqadmin declare queue name=my_queue
- 绑定队列到交换机:
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
- 发送消息到指定交换机:
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
- 获取队列消息:
rabbitmqadmin get queue=my_queue
这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。
延时3秒和8秒全部代码
// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); // 在 10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);相关文章:
typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存
Manjaro安装RabbitMQ 安装 sudo pacman -S rabbitmq rabbitmqadmin启动管理模块 sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限…...
怎样才能把重建大师的空三导进去CC?
导出空三文件xml两者都是通用的,cc和photoscan都可以兼容。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件,输入倾斜照片,激光点云,POS信息及像控点,输出高精度彩色网格模型,可一键…...
命令模式(请求与具体实现解耦)
目录 前言 UML plantuml 类图 实战代码 模板 Command Invoker Receiver Client 前言 命令模式解耦了命令请求者(Invoker)和命令执行者(receiver),使得 Invoker 不再直接引用 receiver,而是依赖于…...
开发一款MMOARPG难度到底有多大
开发一款MMOARPG难度到底有多大 MMORPG游戏开发到底有多难,我们按照过去开发的标准,就比如开发一款传奇,那时候哪会用什么别人的引擎,都是自研,从基础图形API开始。我们不考虑美术和策划,就单指程序&#x…...
RTSP应用:实现视频流的实时推送
在实现实时视频流推送的项目中,RTSP(Real Time Streaming Protocol)协议扮演着核心角色。本文将指导你通过安装FFmpeg软件,下载并编译live555,以及配置ffmpeg进行视频流推送,来实现一个基本的RTSP流媒体服务…...
Java八股文(数据结构)
Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念:链表、栈、队列和树。 链表是一种线性数据结构,由节点组成,每个节点包含了指向下一个节点的指针; 栈是一种后进先出(LIFO)的数据结构&a…...
ActiveMQ Artemis 系列| High Availability 主备模式(消息复制) 版本2.19.1
一、ActiveMQ Artemis 介绍 Apache ActiveMQ Artemis 是一个高性能的开源消息代理,它完全符合 Java Message Service (JMS) 2.0 规范,并支持多种通信协议,包括 AMQP、MQTT、STOMP 和 OpenWire 等。ActiveMQ Artemis 由 Apache Software Foun…...
QGIS插件系列--WhiteBox Tools
WhiteBox Tools(官网机翻): WhiteboxTools是由圭尔夫大学地貌测量和水文地理信息学研究小组(GHRG)开发的高级地理空间软件包和数据分析平台。该项目始于2017年<>月,并在分析能力方面迅速发展。WhiteboxTools的一…...
SpringMVC设置全局异常处理器
文章目录 背景分析使用ControllerAdvice(RestControllerAdvice)ExceptionHandler实现全局异常全局异常处理-多个处理器匹配顺序存在一个类中存在不同的类中 对于过滤器和拦截器中的异常,有两种思路可以考虑 背景 在项目中我们有需求做一个全…...
Acwing_795前缀和 【一维前缀和】+【模板】二维前缀和
Acwing_795前缀和 【一维前缀和】 题目: 代码: #include <bits/stdc.h> #define int long long #define INF 0X3f3f3f3f #define endl \n using namespace std; const int N 100010; int arr[N];int n,m; int l,r; signed main(){std::ios::s…...
docker 部署 gitlab-ce 16.9.1
文章目录 [toc]拉取 gitlab-ce 镜像创建 gitlab-ce 持久化目录启停脚本配置配置 gitlab-ce编辑 gitlab-ce 配置文件重启 gitlab-ce配置 root 密码 设置中文 gitlab/gitlab-ce(需要科学上网) 拉取 gitlab-ce 镜像 docker pull gitlab/gitlab-ce:16.9.1-ce.0查看镜像是不是有 Vo…...
29.Python从入门到精通—Python3 面向对象继承 多继承 方法重写 类属性与方法
29.从入门到精通:Python3 面向对象继承 多继承 方法重写 类属性与方法 继承多继承方法重写类属性与方法 继承 在面向对象编程中,继承是指通过继承现有类的属性和方法来创建新类的过程。新类称为子类(或派生类),现有类…...
jQuery如何获取元素宽高?
在jQuery中,获取元素的宽和高有多种方法,取决于你是否需要包括边框、内边距或其他额外空间。以下是几种常用的方式: 获取元素内容区域的宽和高(不包括边框和内边距): var width $(#yourElement).width(); …...
springdata框架对es集成
什么是spring data框架 Spring Data是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce框架和云计算数据服务。Spring Data可以极大的简化JPA(Elasticsearch…)的…...
jvm(虚拟机)运行时数据区域介绍
Java虚拟机(JVM)运行时数据区域是Java程序在运行过程中使用的内存区域,它主要包括以下几个部分: 程序计数器(Program Counter Register): 程序计数器是一块较小的内存区域,是线程私有…...
C++ MFC 只启动一个程序实例 唤醒之前的实例(完整源码)
初级代码游戏的专栏介绍与文章目录-CSDN博客 很多时候我们希望只允许启动一个程序实例,如果再次运行,就唤醒之前的实例。 目录 1 概述 2 相关技术介绍 2.1 互斥对象 2.2 查找窗口 2.3 唤醒窗口 1 概述 技术上并不难,涉及到以下几个技术…...
2024多云管理平台CMP排名看这里!
随着云计算技术的迅猛发展,多云管理平台CMP应运而生。多云管理平台CMP仅能够简化对多个云环境的统一管理,还能提高资源利用效率和降低成本。因此了解多云管理平台CMP品牌是必要的。2024多云管理平台CMP排名看这里!仅供参考哈! 20…...
MySQL 数据库的日志管理、备份与恢复
一. 数据库备份 1.数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中,数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因: 程序错误人为,操作错误,运算错误,磁盘故障灾难(如火灾、地震࿰…...
一、Go开发环境搭建
文章目录 1、开发工具2、开发环境配置3、Hello World4、语法 1、开发工具 https://code.visualstudio.com/download 2、开发环境配置 类比Java的JDK,go的SDK下载:https://studygolang.com/dl 解压: 配置环境变量path,将命令&quo…...
包子凑数(蓝桥杯,闫氏DP分析法)
题目描述: 小明几乎每天早晨都会在一家包子铺吃早餐。 他发现这家包子铺有 N 种蒸笼,其中第 i 种蒸笼恰好能放 Ai 个包子。 每种蒸笼都有非常多笼,可以认为是无限笼。 每当有顾客想买 X 个包子,卖包子的大叔就会迅速选出若干笼…...
ESP32/ESP8266嵌入式IoT工具库:轻量、可靠、生产就绪
1. 项目概述esp-iot-utils是面向 ESP32 和 ESP8266 平台的轻量级、生产就绪型嵌入式 IoT 工具集。它并非功能堆砌的“大而全”框架,而是以工程师视角提炼出高频、重复、易出错的底层任务——网络通信、结构化数据解析、时间同步、配置持久化与系统状态管理——并封装…...
教你把歌曲原声调小的5个技巧!简单又好用 赶紧收藏
在日常生活中,调整歌曲原声调小是非常常见的音频处理需求。比如在剪辑视频时,可能需要降低背景音乐的音量以突出旁白;或者在制作播客时,需要平衡人声与背景音的比例;还有在手机上听音乐时,某些歌曲突然出现…...
Kangaroo运动控制器Packet Serial通信协议详解
1. Kangaroo运动控制器底层通信技术解析 Kangaroo运动控制器是由RoboClaw系列厂商推出的专用闭环步进/伺服电机驱动模块,其核心价值在于将复杂的PID调节、电流环控制、位置反馈处理等算法固化于硬件中,使上位机仅需通过精简的串行协议即可完成高精度运动…...
Linux服务器上Ollama离线安装全攻略(附systemd服务配置)
Linux服务器上Ollama离线安装全攻略(附systemd服务配置) 在企业内网或实验室环境中,离线部署AI工具往往面临诸多挑战。本文将手把手带你完成Ollama在Linux服务器上的完整离线安装流程,特别针对无外网访问权限的场景优化࿰…...
基于LabVIEW的车床主体振动检测系统:CSV数据导入、滤波分析与时频域可视化
温馨提示:文末有联系方式系统概述 本系统采用LabVIEW平台开发,专为车床主体结构振动特性评估而设计,具备高精度数据采集与智能分析能力,适用于设备健康状态诊断与工艺优化场景。数据导入与预处理 系统支持标准CSV格式振动数据文件…...
图像处理算法资料(FPGA Verilog): RGB2GRAY、阈值分割、滤波、边缘检测等算...
图像处理算法资料( FPGA Verilog) 分别有RGB2GRAY、阈值分割(二值化)、均值滤波、中值滤波、sobel边缘检测、膨胀、腐蚀、开闭运算。 各个模块的结构与上图的顶层模块结构一致,通过模块之间的组合串联组成 ISP 顶层模块。 使用vivado软件&…...
QGIS进阶指南:动态标注与条件表达式高级应用
1. 动态标注的核心价值与应用场景 当你面对一个包含上千条建筑数据的地图图层时,传统静态标注会显得力不从心——商场和医院用相同字体显示,重要地标淹没在普通建筑中,数据更新后还得手动调整样式。这就是动态标注技术大显身手的时候了。 动态…...
ESP8266轻量协程调度器:零栈LeanTask与确定性多任务设计
1. 项目概述ESP8266Scheduler 是一个专为 ESP8266 平台设计的协作式多任务调度器(Co-operative Multitasking Scheduler),其核心目标是在资源受限的 Wi-Fi SoC 上实现轻量、确定、可预测的任务并发执行模型,同时避免传统抢占式 RT…...
Adafruit MAX44009库详解:超低功耗环境光传感器驱动与工程实践
1. 项目概述Adafruit MAX44009 库是专为 Analog Devices(原 Maxim Integrated)推出的 MAX44009 环境光传感器设计的 Arduino 兼容驱动库。该库封装了 IC 通信、寄存器配置、自动量程切换、中断管理及光照度(lux)换算等底层逻辑&am…...
罗技鼠标宏压枪脚本:绝地求生精准射击的终极解决方案
罗技鼠标宏压枪脚本:绝地求生精准射击的终极解决方案 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg 还在为《绝地求生》中的后坐力控…...
