当前位置: 首页 > article >正文

在nodejs中使用RabbitMQ(七)实现生产者确认

  • 生产者:批量发送消息(每批10条),每条消息附带唯一 correlationId,并监听确认队列(ackQueue)。

  • 消费者:处理消息后,通过 ackQueue 返回确认消息(携带原 correlationId)。

  • 超时重试:若某批消息在指定时间内未全部确认,未确认的消息会重新加入待发送队列。

producer.ts

import amqp from 'amqplib';async function start() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });async function produce(limit: number, data: string[], timeout: number = 10000) {let message = [...data];if (message.length > limit) {message = message.slice(0, limit);} else if (message.length < limit) {limit = message.length;}// 消息确认let cache: Array<{correlationId: string,message: string,isDelete: boolean,}> = new Array(limit).fill(null).map((_, index) => {return {correlationId: Math.random().toString().slice(2, -1),message: message[index],isDelete: false,};});for (let i = 0; i < limit; ++i) {channel.sendToQueue(queue, Buffer.from(cache[i].message), {correlationId: cache[i].correlationId,replyTo: ackQueue});}const consume = await channel.consume(ackQueue, (message) => {if (!message) {console.error('message is null', message);return;}let index = cache.findIndex((item) => item.correlationId === message.properties.correlationId);if (index !== -1) {cache[index].isDelete = true;console.log('confirmed success:', `"${message.content.toString()}"`, cache.every(item => item.isDelete));} else {console.log('confirmed fail:', `"${message.content.toString()}"`, cache, cache.every(item => item.isDelete), message.properties.correlationId);}channel.ack(message);});const sleep = (time: number) => {return new Promise<void>(resolve => setTimeout(() => resolve(), time));}let stop = false;const interval = async () => {await sleep(0);if (cache.every(item => item.isDelete) || stop) {return;} else {await interval();}}await Promise.race([interval(), // 监听本批次消息是否已经处理完成sleep(timeout), // 本批次消息最长处理时间]);stop = true;await channel.cancel(consume.consumerTag);// 没有收到确认的消息返回下一批处理继续处理return cache.filter(item => !item.isDelete).map(item => item.message);}// 发送1000条数据,分100批,每批10个let msg = new Array(100).fill(null).map((_, index) => `${index} message ${Math.random().toString().slice(2, -1)}`);while (msg.length) {let res = await produce(10, msg.slice(0, 10), 6000);msg = [...res, ...msg.slice(10, msg.length)];console.log('完成一批:', msg.length, '发送结果:', res.length, res);}
}start();

consumer.ts

import amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');const channel = await connection.createChannel();const queue = 'queue11';const ackQueue = 'queue11_ack';await channel.assertQueue(queue, { durable: true });await channel.assertQueue(ackQueue, { durable: true });channel.consume(queue, (message) => {if (message) {console.log(message?.content.toString(), message?.properties?.replyTo, message?.properties?.correlationId);// 消息处理完后,向 ackQueue 发送确认消息channel.sendToQueue(ackQueue, message?.content, {// 使用相同的 correlationId 来标识确认消息correlationId: message?.properties?.correlationId,// 将原 replyTo 信息传递回来// replyTo: queue,});// 确认 queue11 中的消息channel.ack(message);} else {console.error('message is null', message);}}, { noAck: false });
}produce();

 

相关文章:

在nodejs中使用RabbitMQ(七)实现生产者确认

生产者&#xff1a;批量发送消息&#xff08;每批10条&#xff09;&#xff0c;每条消息附带唯一 correlationId&#xff0c;并监听确认队列&#xff08;ackQueue&#xff09;。 消费者&#xff1a;处理消息后&#xff0c;通过 ackQueue 返回确认消息&#xff08;携带原 corre…...

vue中Img图片资源require导入时数据没有过来的时候报错了-解决方案

src_views_followOrder_myFollow_index_vue.js:903 Uncaught (in promise) Error: Cannot find module ./undefined-icon.svg 该错误表示在Vue组件或JavaScript文件中找不到名为“undefined-icon.svg”的模块。可能原因是: 1. 路径错误:检查文件路径是否正确,确保文件实际上…...

Java:204 基于springboot零食销售商城的设计与实现

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统主要分为管理员和用户、商家。 用户可以使用网站首页的登录注册界面进行在线登录注册&#xff0c;并且注册登录后方可使用系统的各种功能以及购物…...

harmonyOS的文件的增、删、读、写相关操作(fs/content)

注意: 操作harmonyOS的文件只能对app沙箱内的文件进行操作 牵扯到两个支持点: fs和content这两个API; 具体的操作方法看下图: 创建文件 //js 引入 import fs from "ohos.files.fs" import featureAbility from "ohos.ability.featureAbility"; // 上下…...

【golang】量化开发学习(一)

均值回归策略简介 均值回归&#xff08;Mean Reversion&#xff09;假设价格会围绕均值波动&#xff0c;当价格偏离均值一定程度后&#xff0c;会回归到均值。 基本逻辑&#xff1a; 计算一段时间内的移动均值&#xff08;如 20 天均线&#xff09;。当当前价格高于均值一定比…...

4090单卡挑战DeepSeek r1 671b:尝试量化后的心得的分享

引言&#xff1a; 最近&#xff0c;DeepSeek-R1在完全开源的背景下&#xff0c;与OpenAI的O1推理模型展开了激烈竞争&#xff0c;引发了广泛关注。为了让更多本地用户能够运行DeepSeek&#xff0c;我们成功将R1 671B参数模型从720GB压缩至131GB&#xff0c;减少了80%&#xff…...

MySQL数据库(八)☞ 我是不是锁神

目录 1 全局锁的应用 2 索引对行锁的影响 3 表锁&#xff08;显式&#xff09;--表级锁 4 元数据锁 MDL(隐式)--表级锁 5 意向锁(Intention)--IS锁 IX锁--表级锁&#xff08;隐式&#xff09; 6 记录锁-(Record)-S锁 X锁 -- 行级锁 7 如何理解select ... lock in share …...

AI法理学与责任归属:技术演进下的法律重构与伦理挑战

文章目录 引言:智能时代的新型法律困境一、AI技术特性对传统法理的冲击1.1 算法黑箱与可解释性悖论1.2 动态学习系统的责任漂移1.3 多智能体协作的责任稀释二、AI法理学的核心争议点2.1 法律主体资格认定2.2 因果关系的技术解构2.3 过错标准的重新定义三、责任归属的实践案例分…...

华象新闻 | 2月20日前谨慎升级 PostgreSQL 版本

各位 PostgreSQL 用户&#xff0c;建议近期进行升级 PostgreSQL 版本。 2月20日计划进行非周期性版本发布 PostgreSQL全球开发团队计划于2025年2月20日进行一次非周期性发布&#xff0c;以解决2025年2月13日更新版本中引入的一个回归问题。 2月13日的更新版本包括了17.3、16.7、…...

【NLP】循环神经网络RNN

目录 一、认识RNN 二、RNN模型分类 三、传统RNN模型 3.1 结构分析 3.2 Pytorch构建RNN模型 3.3 优缺点 一、认识RNN RNN(Recurrent Neural Network)&#xff0c;中文称作循环神经网络&#xff0c;一般以序列数据为输入&#xff0c;通过网络内部的结构设计有效捕捉序列之…...

pnpm, eslint, vue-router4, element-plus, pinia

利用 pnpm 创建 vue3 项目 pnpm 包管理器 - 创建项目 Eslint 配置代码风格(Eslint用于规范纠错&#xff0c;prettier用于美观&#xff09; 在 设置 中配置保存时自动修复 提交前做代码检查 husky是一个 git hooks工具&#xff08;git的钩子工具&#xff0c;可以在特定实际执行特…...

Vue的简单入门 一

声明&#xff1a;本版块根据B站学习&#xff0c;创建的是vue3项目&#xff0c;用的是vue2语法风格&#xff0c;仅供初学者学习。 目录 一、Vue项目的创建 1.已安装15.0或更高版本的Node.js 2.创建项目 二、 简单认识目录结构 三、模块语法中的指令 1.v-html 1.文本插值…...

VMware Workstate 的 Ubuntu18 安装 vmware tools(不安装没法共享)

在共享主机路径后&#xff0c;可以在&#xff1a; /mnt/hgfs/下方找到共享的文件。但没有安装vmware tool时是没法共享的。 如何安装vmware tool&#xff0c;网上版本很多。这里记录一下&#xff1a; VMware Workstation 17 Pro&#xff0c;版本&#xff1a;17.6.0 虚拟机系统…...

深入Flask:如何优雅地处理HTTP请求与响应

哈喽,大家好,我是木头左! 本文将带你深入了解如何在Flask中优雅地处理HTTP请求和响应,让你的应用更加高效、安全和用户友好。 创建一个简单的Flask应用 让从创建一个最简单的Flask应用开始: from flask import Flaskapp = Flask(__name__)@app.route(/) def...

Typescript 【详解】配置文件 tsconfig.json

用于控制 TypeScript 编译器如何将 .ts 文件编译为 .js 文件 可以使用命令生成 npx tsc --init{"compilerOptions": {"target": "ES6","module": "commonjs","strict": true},"include": ["src/…...

GC 基础入门

什么是GC&#xff08;Garbage Collection&#xff09;&#xff1f; 内存管理方式通常分为两种&#xff1a; 手动内存管理&#xff08;Manual Memory Management&#xff09;自动内存管理&#xff08;Garbage Collection, GC&#xff09; 手动内存管理 手动内存管理是指开发…...

坑多多之AC8257 i2c1 rtc-pcf8563

pcf85163 ordering information Ordering information Package Description Version Marking code PCF85163T/1 SO8 ① SOT96-1 PF85163 PCF85163TS/1 TSSOP8 ② SOT505-1 85163 ①plastic small outline package; 8 leads;body width 3.9 mm ②plastic thin…...

UE求职Demo开发日志#32 优化#1 交互逻辑实现接口、提取Bag和Warehouse的父类

1 定义并实现交互接口 接口定义&#xff1a; // Fill out your copyright notice in the Description page of Project Settings.#pragma once#include "CoreMinimal.h" #include "UObject/Interface.h" #include "MyInterActInterface.generated.h…...

自动化测试题

1.什么项目适合做自动化测试&#xff1f; 答&#xff1a;一般来说&#xff0c;适合做自动化测试的项目应该满足以下几个条件&#xff1a; 项目需求稳定&#xff0c;变更不频繁。 项目周期较长&#xff0c;需要反复进行回归测试。 项目功能较复杂&#xff0c;涉及多个模块和…...

vite配置proxy和nginx同步配置反向代理,vite的base含义

vite配置代理是为了在开发环境下联调服务器接口&#xff0c;如果不配置代理&#xff0c;开发时会出现跨域&#xff0c; 会在请求的url的前缀添加标识如/api,代理请求时在rewrite为""&#xff0c;或者rewrite为其他字符串&#xff0c; 项目打包部署后&#xff0c;需要…...

如何在 Mac 上解决 Qt Creator 安装后应用程序无法找到的问题

在安装Qt时&#xff0c;遇到了一些问题&#xff0c;尤其是在Mac上安装Qt后&#xff0c;发现Qt Creator没有出现在应用程序中。通过一些搜索和操作&#xff0c;最终解决了问题。以下是详细的记录和解决方法。 1. 安装Qt后未显示Qt Creator 安装完成Qt后&#xff0c;启动应用程…...

FFmpeg+SDL实现简易视频播放器

参考链接 https://blog.csdn.net/qq_26611129/article/details/98732561 https://www.cnblogs.com/Azion/p/17756274.html https://avmedia.0voice.com/?id49050 https://blog.csdn.net/qq_44825209/article/details/133760652 https://www.cnblogs.com/Azion/p/17525955.htm…...

set_intersection set_union set_difference set_symmetric_difference

std::set_intersection 用于计算两个已排序范围的交集。它将交集的结果写入到指定的输出迭代器中。 std::set_union 用于计算两个已排序范围的并集。它将并集的结果写入到指定的输出迭代器中。 std::set_difference 用于计算两个已排序范围的差集。它将差集的结果写入到指…...

webpack打包优化策略

1. 减少打包体积 减少打包文件的大小是为了提高加载速度&#xff0c;降低网络带宽消耗&#xff0c;提升用户体验。常见的减少打包体积的优化策略包括&#xff1a; 代码分割&#xff08;Code Splitting&#xff09;&#xff1a;将代码拆分成多个小文件&#xff0c;让浏览器按需…...

多线程基础面试题剖析

一、线程的创建方式有几种 创建线程的方式有两种&#xff0c;一种是继承Thread&#xff0c;一种是实现Runable 在这里推荐使用实现Runable接口&#xff0c;因为java是单继承的&#xff0c;一个类继承了Thread将无法继承其他的类&#xff0c;而java可以实现多个接口&#xff0…...

WEB安全--SQL注入--floor报错注入

一、原理&#xff1a; floor()报错注入需要组合count()、rand()、group by()等函数使用&#xff0c;通过一些手段使数据库在处理语句时产生主键重复的报错&#xff0c;从而达到爆出信息的目的 二、内容&#xff1a; ?id-1 or (select 1 from (select count(*),concat(databa…...

resultMap 标签

resultMap 是 MyBatis 框架中用于定义数据库结果集与 Java 对象之间映射关系的核心标签。它的主要作用是解决数据库字段名与 Java 对象属性名不一致的问题&#xff0c;或处理复杂查询&#xff08;如关联查询、嵌套对象、集合映射等&#xff09;时的映射需求。 主要用途&#x…...

17.推荐系统的在线学习与实时更新

接下来就讲解推荐系统的在线学习与实时更新。推荐系统的在线学习和实时更新是为了使推荐系统能够动态地适应用户行为的变化&#xff0c;保持推荐结果的实时性和相关性。以下是详细的介绍和实现方法。 推荐系统的在线学习与实时更新 在线学习的概念 在线学习&#xff08;Onli…...

Android设备 网络安全检测

八、网络与安全机制 6.1 网络框架对比 volley&#xff1a; 功能 基于HttpUrlConnection;封装了UIL图片加载框架&#xff0c;支持图片加载;网络请求的排序、优先级处理缓存;多级别取消请求;Activity和生命周期的联动&#xff08;Activity结束生命周期同时取消所有网络请求 …...

Kotlin 2.1.0 入门教程(二十)扩展

扩展 Kotlin 提供了一种能力&#xff0c;无需继承类或使用像装饰器这样的设计模式&#xff0c;就能为类或接口扩展新的功能。这是通过一种名为扩展的特殊声明来实现的。 例如&#xff0c;你可以为无法修改的第三方库中的类或接口编写新的函数。这些函数可以像原类的方法一样以…...