当前位置: 首页 > news >正文

typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); //10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);

相关文章:

typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ 安装 sudo pacman -S rabbitmq rabbitmqadmin启动管理模块 sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限&#xf…...

怎样才能把重建大师的空三导进去CC?

导出空三文件xml两者都是通用的&#xff0c;cc和photoscan都可以兼容。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件&#xff0c;输入倾斜照片&#xff0c;激光点云&#xff0c;POS信息及像控点&#xff0c;输出高精度彩色网格模型&#xff0c;可一键…...

命令模式(请求与具体实现解耦)

目录 前言 UML plantuml 类图 实战代码 模板 Command Invoker Receiver Client 前言 命令模式解耦了命令请求者&#xff08;Invoker&#xff09;和命令执行者&#xff08;receiver&#xff09;&#xff0c;使得 Invoker 不再直接引用 receiver&#xff0c;而是依赖于…...

开发一款MMOARPG难度到底有多大

开发一款MMOARPG难度到底有多大 MMORPG游戏开发到底有多难&#xff0c;我们按照过去开发的标准&#xff0c;就比如开发一款传奇&#xff0c;那时候哪会用什么别人的引擎&#xff0c;都是自研&#xff0c;从基础图形API开始。我们不考虑美术和策划&#xff0c;就单指程序&#x…...

RTSP应用:实现视频流的实时推送

在实现实时视频流推送的项目中&#xff0c;RTSP&#xff08;Real Time Streaming Protocol&#xff09;协议扮演着核心角色。本文将指导你通过安装FFmpeg软件&#xff0c;下载并编译live555&#xff0c;以及配置ffmpeg进行视频流推送&#xff0c;来实现一个基本的RTSP流媒体服务…...

Java八股文(数据结构)

Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念&#xff1a;链表、栈、队列和树。 链表是一种线性数据结构&#xff0c;由节点组成&#xff0c;每个节点包含了指向下一个节点的指针&#xff1b; 栈是一种后进先出&#xff08;LIFO&#xff09;的数据结构&a…...

ActiveMQ Artemis 系列| High Availability 主备模式(消息复制) 版本2.19.1

一、ActiveMQ Artemis 介绍 Apache ActiveMQ Artemis 是一个高性能的开源消息代理&#xff0c;它完全符合 Java Message Service (JMS) 2.0 规范&#xff0c;并支持多种通信协议&#xff0c;包括 AMQP、MQTT、STOMP 和 OpenWire 等。ActiveMQ Artemis 由 Apache Software Foun…...

QGIS插件系列--WhiteBox Tools

WhiteBox Tools&#xff08;官网机翻&#xff09;: WhiteboxTools是由圭尔夫大学地貌测量和水文地理信息学研究小组&#xff08;GHRG&#xff09;开发的高级地理空间软件包和数据分析平台。该项目始于2017年<>月&#xff0c;并在分析能力方面迅速发展。WhiteboxTools的一…...

SpringMVC设置全局异常处理器

文章目录 背景分析使用ControllerAdvice&#xff08;RestControllerAdvice&#xff09;ExceptionHandler实现全局异常全局异常处理-多个处理器匹配顺序存在一个类中存在不同的类中 对于过滤器和拦截器中的异常&#xff0c;有两种思路可以考虑 背景 在项目中我们有需求做一个全…...

Acwing_795前缀和 【一维前缀和】+【模板】二维前缀和

Acwing_795前缀和 【一维前缀和】 题目&#xff1a; 代码&#xff1a; #include <bits/stdc.h> #define int long long #define INF 0X3f3f3f3f #define endl \n using namespace std; const int N 100010; int arr[N];int n,m; int l,r; signed main(){std::ios::s…...

docker 部署 gitlab-ce 16.9.1

文章目录 [toc]拉取 gitlab-ce 镜像创建 gitlab-ce 持久化目录启停脚本配置配置 gitlab-ce编辑 gitlab-ce 配置文件重启 gitlab-ce配置 root 密码 设置中文 gitlab/gitlab-ce(需要科学上网) 拉取 gitlab-ce 镜像 docker pull gitlab/gitlab-ce:16.9.1-ce.0查看镜像是不是有 Vo…...

29.Python从入门到精通—Python3 面向对象继承 多继承 方法重写 类属性与方法

29.从入门到精通&#xff1a;Python3 面向对象继承 多继承 方法重写 类属性与方法 继承多继承方法重写类属性与方法 继承 在面向对象编程中&#xff0c;继承是指通过继承现有类的属性和方法来创建新类的过程。新类称为子类&#xff08;或派生类&#xff09;&#xff0c;现有类…...

jQuery如何获取元素宽高?

在jQuery中&#xff0c;获取元素的宽和高有多种方法&#xff0c;取决于你是否需要包括边框、内边距或其他额外空间。以下是几种常用的方式&#xff1a; 获取元素内容区域的宽和高&#xff08;不包括边框和内边距&#xff09;&#xff1a; var width $(#yourElement).width(); …...

springdata框架对es集成

什么是spring data框架 Spring Data是一个用于简化数据库、非关系型数据库、索引库访问&#xff0c;并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷&#xff0c;并支持 map-reduce框架和云计算数据服务。Spring Data可以极大的简化JPA(Elasticsearch…)的…...

jvm(虚拟机)运行时数据区域介绍

Java虚拟机&#xff08;JVM&#xff09;运行时数据区域是Java程序在运行过程中使用的内存区域&#xff0c;它主要包括以下几个部分&#xff1a; 程序计数器&#xff08;Program Counter Register&#xff09;&#xff1a; 程序计数器是一块较小的内存区域&#xff0c;是线程私有…...

C++ MFC 只启动一个程序实例 唤醒之前的实例(完整源码)

初级代码游戏的专栏介绍与文章目录-CSDN博客 很多时候我们希望只允许启动一个程序实例&#xff0c;如果再次运行&#xff0c;就唤醒之前的实例。 目录 1 概述 2 相关技术介绍 2.1 互斥对象 2.2 查找窗口 2.3 唤醒窗口 1 概述 技术上并不难&#xff0c;涉及到以下几个技术…...

2024多云管理平台CMP排名看这里!

随着云计算技术的迅猛发展&#xff0c;多云管理平台CMP应运而生。多云管理平台CMP仅能够简化对多个云环境的统一管理&#xff0c;还能提高资源利用效率和降低成本。因此了解多云管理平台CMP品牌是必要的。2024多云管理平台CMP排名看这里&#xff01;仅供参考哈&#xff01; 20…...

MySQL 数据库的日志管理、备份与恢复

一. 数据库备份 1.数据备份的重要性 备份的主要目的是灾难恢复。 在生产环境中&#xff0c;数据的安全性至关重要。 任何数据的丢失都可能产生严重的后果。 造成数据丢失的原因&#xff1a; 程序错误人为,操作错误,运算错误,磁盘故障灾难&#xff08;如火灾、地震&#xff0…...

一、Go开发环境搭建

文章目录 1、开发工具2、开发环境配置3、Hello World4、语法 1、开发工具 https://code.visualstudio.com/download 2、开发环境配置 类比Java的JDK&#xff0c;go的SDK下载&#xff1a;https://studygolang.com/dl 解压&#xff1a; 配置环境变量path&#xff0c;将命令&quo…...

包子凑数(蓝桥杯,闫氏DP分析法)

题目描述&#xff1a; 小明几乎每天早晨都会在一家包子铺吃早餐。 他发现这家包子铺有 N 种蒸笼&#xff0c;其中第 i 种蒸笼恰好能放 Ai 个包子。 每种蒸笼都有非常多笼&#xff0c;可以认为是无限笼。 每当有顾客想买 X 个包子&#xff0c;卖包子的大叔就会迅速选出若干笼…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...

智能AI电话机器人系统的识别能力现状与发展水平

一、引言 随着人工智能技术的飞速发展&#xff0c;AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术&#xff0c;在客户服务、营销推广、信息查询等领域发挥着越来越重要…...