当前位置: 首页 > article >正文

在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件(rabbitmq_sharding)允许将消息分布到多个队列中,这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载,从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间分配负载,避免单个队列过载。(注:不能单独消费分片消息。消息分片不利于消息顺序区分)

启用消息分片插件。 

rabbitmq-plugins enable rabbitmq_sharding 

示例

通过rabbitmq management添加策略,用于分片消息匹配转发。

或者通过命令添加策略 

CTL set_policy images-shard "queue10" '{"shards-per-node": 3, "routing-key": "sharding"}'

producer.ts

import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange_queue10';await channel.assertExchange(exchangeName,'x-modulus-hash',{durable: true,arguments: {'x-modulus': 3 // 分片数量(需与队列分片数匹配)}},);let routeKey = '';for (let i = 0; i < 1000; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,`${routeKey}${i}`,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
}start();

consumer.ts

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672//mirror', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange_queue10';channel.prefetch(32);// for(let i=0;i<3;++i){//   channel.assertQueue(queueName, { durable: true }, () => {//     channel.bindQueue(queueName, exchangeName, `shard_${shardId}`);//   });// }channel.consume(exchangeName, function (msg) {if(msg){console.log(`队列'${exchangeName}'接收到的消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

相关文章:

在nodejs中使用RabbitMQ(六)sharding消息分片

RabbitMQ 的分片插件&#xff08;rabbitmq_sharding&#xff09;允许将消息分布到多个队列中&#xff0c;这在消息量很大或处理速度要求高的情况下非常有用。分片功能通过将消息拆分到多个队列中来平衡负载&#xff0c;从而提升消息处理的吞吐量和可靠性。它能够在多个队列之间…...

STM32 I2C通信协议说明

目录 背景 I2C协议 数据的有效性 I2C通信开始和停止条件 I2C数据传输 发送 响应 正常情况&#xff1a; 异常情况&#xff1a; 主机结束接收 写寄存器的标准流程 读寄存器的标准流程 仲裁机制 时钟同步 SDA线的仲裁 程序 背景 对单片机的三大通信中的I2C通信进…...

git bash在github的库中上传或更新本地文件

一、将本地文件上传到 GitHub 仓库 1. 创建 GitHub 仓库 如果你还没有在 GitHub 上创建仓库&#xff0c;首先需要创建一个新的仓库&#xff1a; 登录到 GitHub。点击右上角的 按钮&#xff0c;选择 New repository。给你的仓库起个名字&#xff0c;并选择 Public 或 Privat…...

Keysight E5071C (Agilent) 网络分析仪的特性和规格

安捷伦E5071C网络分析仪 Keysight E5071C网络分析仪 Keysight E5071C (Agilent) 网络分析仪的其他特性和规格包括&#xff1a; 宽动态范围&#xff1a;测试端口动态范围 > 123 dB&#xff08;典型值&#xff09; 快速测量速度&#xff1a;41 ms 全 2 端口校准&#xff0c;…...

总结:如何在SpringBoot中使用https协议以及自签证书?

总结&#xff1a;如何在SpringBoot中使用https协议以及自签证书&#xff1f; 前提一&#xff1a;什么是http协议&#xff1f;前提二&#xff1a;什么是https协议&#xff1f;一生成自签证书二 将证书转换为PKCS12格式三 配置SpringBoot&#xff08;1&#xff09;修改配置文件&a…...

Golang学习历程【第七篇 闭包type defer panic recover了解time包】

Golang学习历程【第七篇 闭包&type defer panic recover了解】 1. 闭包1.1 闭包的定义1.2 闭包的特点1.3 闭包的示例 2. 类型(type)2.1 自定义类型2.2 类型示例 3. 延迟执行&#xff08;Defer&#xff09;3.1 defer 的用法3.2 defer 示例 4. 恐慌&#xff08;Panic&#xf…...

基于SSM+uniapp的数学辅导小程序+LW示例参考

1.项目介绍 系统角色&#xff1a;管理员、普通用户功能模块&#xff1a;用户管理、学习中心、知识分类管理、学习周报管理、口算练习管理、试题管理、考试管理、错题本等技术选型&#xff1a;SSM&#xff0c;Vue&#xff08;后端管理web&#xff09;&#xff0c;uniapp等测试环…...

利用AI智能体创建云端文档知识库并集成第三方数据源(上)

许多开发者在管理和集成多种云端的数据源时经常面对各种各样的困难&#xff0c;所以希望能够构建一个聊天机器人来协调这些数据源&#xff0c;针对业务问题并提供全面的答案。本文介绍了一种解决方案&#xff0c;帮助大家开发一个能够从文档和数据库中回答查询的聊天机器人&…...

聚铭网络入围2025年度江苏省政府采购信息安全设备协议供货名单

近日&#xff0c;2025年度江苏省党政机关、事业单位及团体组织信息安全设备框架协议采购项目入围结果公布。聚铭网络凭借自身专业实力和技术优势脱颖而出&#xff0c;成功入围22个分包。 此次采购项目是江苏省政府采购领域级别最高、覆盖面最广的项目之一。从资格评选到后期材料…...

vue+springboot+webtrc+websocket实现双人音视频通话会议

前言 最近一些时间我有研究&#xff0c;如何实现一个视频会议功能&#xff0c;但是找了好多资料都不太理想&#xff0c;最终参考了一个文章 WebRTC实现双端音视频聊天&#xff08;Vue3 SpringBoot&#xff09; 只不过&#xff0c;它的实现效果里面只会播放本地的mp4视频文件&…...

2025年单片机毕业设计选题物联网计算机电气电子通信类

当然&#xff0c;以下是基于物联网技术设计的20个单片机类题目&#xff0c;旨在考察学生在物联网环境下单片机应用、系统设计、数据传输与处理等方面的能力&#xff1a; 基于物联网的智能家居温度湿度控制系统设计&#xff1a;利用单片机和传感器实现室内环境的温湿度监测&…...

堡垒机调用xshell 无反应

安装sso_client 确认db_path.ini xhsell路径 如图调整为本机安装的路径即可。 实战问题&#xff1a; 操作完成之后 Chrome还是无法调用&#xff0c;使用360浏览器没问题。...

python后端调用Deep Seek API

python后端调用Deep Seek API 需要依次下载 ●Ollama ●Deepseek R1 LLM模型 ●嵌入模型nomic-embed-text / bge-m3 ●AnythingLLM 参考教程&#xff1a; Deepseek R1打造本地化RAG知识库:安装部署使用详细教程 手把手教你&#xff1a;deepseek R1基于 AnythingLLM API 调用本地…...

Easy系列PLC 线性变换功能块(模拟量相关功能块汇总)

线性转换函数S_RTR 线性转换函数S_RTR(SCL和ST代码)_线性函数的scl语言如何编写-CSDN博客文章浏览阅读440次。博客介绍了线性转换函数S_RTR,包括其在PLC中的应用,如何与工艺PID组合使用,以及在张力开环控制中的具体实践。还提到了函数的C99兼容性,并提供了S_RTR的功能块源…...

【VB语言】EXCEL中VB宏的应用

【VB语言】EXCEL中VB宏的应用 文章目录 [TOC](文章目录) 前言一、EXCEL-VB1.实验过程2.代码 二、EXCEL-VB 生成.c.h文件1.实验过程2.代码 四、参考资料总结 前言 1.WPS-VB扩展包 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、EXCEL-VB 1.实验过…...

【人工智能】如何选择合适的大语言模型,是能否提高工作效率的关键!!!

DeepSeek R1入门指南 导读一、提示语差异1.1 指令侧重点不同1.2 语言风格差异1.3 知识运用引导不同 二、挑选原则2.1 模型选择2.2 提示语设计2.3 避免误区 结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 在前面的内容中&#xff0c…...

Unity使用反射进行Protobuf(CS/SC)协议,json格式

protobuf生成的协议,有挺多协议的.利用反射生成dto进行伪协议的响应 和 发送请求 应用场景: 请求(CS)_后端先写完了,前端还搞完时,可使用此请求,可自测 响应(SC)_可自行构建一个响应,对数据进行测试 // 请求 使用物品 CS message ReqUseItem{optional Opcodes MessageID1[def…...

初学 mybatis

前言 回顾之前 不使用 mybatis 框架&#xff0c;我们是怎么通过Java 操作数据库的 "jdbc" 前提&#xff1a;使用maven 构建的项目 1 添加 关于jdbc 的依赖&#xff0c;以及辅助操作数据库的 commons-dubli jar包 截取 前后端项目 2 添加配置文件里面内容有&…...

java.lang.IllegalArgumentException: 在请求目标中找到无效字符。有效字符在RFC 7230和RFC 3986中定义

Tomcat 屏蔽错误信息。java.lang.IllegalArgumentException: 在请求目标中找到无效字符。有效字符在RFC 7230和RFC 3986中定义 <h1>HTTP状态 400 - 错误的请求</h1><hr class"line" /><p><b>类型</b> 异常报告</p><p&…...

C语言进阶习题(4结构体)【1】通讯录的实现

目录 1.使用结构体实现通讯录功能2.思路3. 代码实现3.1 test.c3.2 contact.c3.3 contact.h 1.使用结构体实现通讯录功能 主要功能有&#xff1a;显示通讯录信息&#xff0c;增加通讯录中人的信息&#xff0c;删除通讯录中人的信息&#xff0c;查找通信录中信息&#xff0c;修改…...

释放你的元数据:使用 Elasticsearch 的自查询检索器

作者&#xff1a;来自 Elastic Josh Asres 了解如何使用 Elasticsearch 的 “self-quering” 检索器来通过结构化过滤器提高语义搜索的相关性。 在人工智能搜索的世界中&#xff0c;在海量的数据集中高效地找到正确的数据至关重要。传统的基于关键词的搜索在处理涉及自然语言的…...

【Python】如何在 Linux/Windows 系统中设置 PYTHONPATH 环境变量

什么是 PYTHONPATH&#xff1f; PYTHONPATH 是一个环境变量&#xff0c;它告诉 Python 解释器在哪些目录中查找要导入的模块。这对于包含不在标准目录中的自定义模块非常有用。 Linux 系统中设置 PYTHONPATH 环境变量 在 Python 开发环境中&#xff0c;正确设置 PYTHONPATH …...

1.14学习总结

日常刷题单 刷了题目后&#xff0c;对于排序方法更加熟练&#xff0c;手搓代码的速度也得到了提高。 感觉字符串还不熟练&#xff0c;高精度更是云里雾里&#xff0c;上升空间极大。 同时看见今晚有个入门难度的测试&#xff0c;去练了练手&#xff0c;想看看自己是什么成分&…...

【Prometheus】prometheus黑盒监控balckbox全面解析与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…...

游戏引擎学习第101天

回顾当前情况 昨天的进度基本上完成了所有内容&#xff0c;但我们还没有进行调试。虽然我们在运行时做的事情大致上是对的&#xff0c;但还是存在一些可能或者确定的bug。正如昨天最后提到的&#xff0c;既然现在时间晚了&#xff0c;就不太适合开始调试&#xff0c;所以今天我…...

海康摄像头IPV6模式,手动,自动,路由公告

海康摄像头DS-2DC7220IW-A 网络设置中的IPv6配置选项。IPv6是互联网协议&#xff08;IP&#xff09;的第六版&#xff0c;用于替代IPv4&#xff0c;提供更多的IP地址和改进的网络功能。图片中的选项允许用户选择如何配置设备的IPv6网络连接&#xff1a; 手动&#xff1a;用户可…...

架构设计系列(二):CI/CD

一、概述 CI/CD 是 持续集成&#xff08;Continuous Integration&#xff09; 和 持续交付/持续部署&#xff08;Continuous Delivery/Continuous Deployment&#xff09; 的缩写&#xff0c;是现代软件开发中的一套核心实践和工具链&#xff0c;旨在提高软件交付的效率、质量…...

执行js生成json文件并动态写入数据

项目中需要执行js后生成一个新的json文件&#xff0c;并在该文件内写入json数据&#xff0c; 示例&#xff1a;生成一个json文件&#xff0c;内含执行这个js的时间戳作为json文件中的数据。 新建一个js文件create.js&#xff0c;js代码如下&#xff1a; const fs require(fs)…...

DDoS技术解析

这里是Themberfue 今天我们不聊别的&#xff0c;我们聊聊著名的网络攻击手段之一的 DDoS&#xff0c;看看其背后的技术细节。 DoS 了解 DDoS 前&#xff0c;先来讲讲 DoS 是什么&#xff0c;此 DoS 而不是 DOS 操作系统啊。1996年9月6日&#xff0c;世界第三古老的网络服务提供…...

28 在可以控制 postgres 服务器, 不知道任何用户名的情况下怎 进入 postgres 服务器

前言 最近有这样的一个需求, 有一个 postgres 服务器 但是 不知道 他的任何的用户名密码, 但是我想要查询这台 postgres 服务器 然后 基于这个需求, 我们看一下 怎么来处理 pg_hba.conf 认证方式修改为 trust 首先将 postgres 服务器的认证方式修改为 trust 这时候 …...