RabbitMQ简单模式和工作模式
RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。
-
简单模式(Simple Mode):
- 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
- 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
- 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
-
工作模式(Work Queue Mode):
- 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
- 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
- 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。
在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。
下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib
库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib
库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。
首先,确保你已经安装了 amqplib
库。可以使用以下命令进行安装:
npm install amqplib
接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:
producer.ts:
import * as amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });for (let i = 0; i < 10; i++) {const message = `Message ${i}`;channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log(` [x] Sent '${message}'`);}setTimeout(() => {connection.close();process.exit(0);}, 500);
}produce();
consumer.ts:
import * as amqp from 'amqplib';async function consume() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });// 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]//这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息await channel.prefetch(1);console.log(' [*] Waiting for messages. To exit press CTRL+C');await channel.consume(queue, async (msg) => {if (msg !== null) {const message = msg.content.toString();console.log(` [x] Received ${message}`);// Simulate some workawait new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);}});
}consume();
这个示例中,生产者将消息发送到名为 work_queue
的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1)
来确保一次只接收一个消息,从而实现竞争消费者模式。
记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:
tsc producer.ts
tsc consumer.ts
然后,分别运行 producer.js
和 consumer.js
。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。
RabbitMQ消息持久化和手动应答
在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:
-
消息持久化(Message Durability):
- 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
- 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的
deliveryMode
属性为2
(persistent
)。 - 例如,在生产者端设置消息为持久化:
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
- 在消费者端,你需要确保队列和消息都被声明为持久化:
channel.assertQueue(queue, { durable: true });
这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。
-
手动应答(Manual Acknowledgment):
- 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
- 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
- 在消费者端,需要将
noAck
设置为false
,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {//模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 重新入队,进行下一次尝试channel.reject(msg, true);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
}, { noAck: false });//默认false
- 在这种情况下,消费者需要在处理完消息后显式调用
channel.ack(msg)
来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。 - 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
msg: 要拒绝的消息对象。
requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。
综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。
重试间隔和次数
-
重新投递消息并设置头部信息:
- 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如
x-redelivered-count
,用来记录消息的重试次数。 - 在消费者端,根据这个头部信息来判断是否达到重试次数的上限,如果是,则不再重新投递,可能将消息放入死信交换机。
- 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如
-
使用外部存储记录重试次数:
- 每次消息处理失败时,将消息的唯一标识(例如 UUID)和重试次数记录到外部存储中(例如 Redis、Memcache、MySQL)。
- 在消费者端,在每次重新处理时,从外部存储中获取当前重试次数,并判断是否达到重试次数的上限。
-
自定义插件:
- 编写一个 RabbitMQ 插件,实现自定义的消息重试逻辑,包括记录重试次数、判断是否重新投递等。
- 这样可以更灵活地控制消息的处理流程。
需要注意的是,这些方法都是基于 RabbitMQ 不直接提供重试次数限制的情况下的一些自定义实践。在回答中也提到了关于 quorum queues 的更新,以及支持通过策略(policy)来限制重投递次数的可能性。因此,具体的实现方式可能会随着 RabbitMQ 版本的更新而有所变化。
await channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());let retryCount = msg.properties.headers['x-retry-count'] || 0;console.log('Processing message:', data);console.log('Retry count:', retryCount);try {if (data.to.includes('recipient1@example.com')) {throw new Error('邮件发送失败...');}//发送邮件await new Promise(resolve => setTimeout(resolve, 1000));channel.ack(msg);} catch (error) {console.log('error:', data);// 增加重试次数retryCount++;// 判断是否达到最大重试次数if (retryCount < maxRetryAttempts) {// 重新发送消息到队列channel.sendToQueue(queueName, msg.content, {persistent: true,headers: {'x-retry-count': retryCount,},});} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
});
相关文章:

RabbitMQ简单模式和工作模式
RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。 简单模式(Simple Mode): 在简单模式中,有…...

c语言实战之贪吃蛇
文章目录 前言效果展示游戏用到的图片游戏思路一览游戏前准备一、贪吃蛇、食物、障碍物节点坐标的结构体二、枚举游戏状态、和贪吃蛇的方向三、维护运行的结构体 游戏开始前的初始化一、学习图形库相关知识二、设置背景三、欢迎界面四、初始化贪吃蛇五、生成障碍物六、生成食物…...
Midjourney图片生成描述词记录(今天一天)
抄别人的描述词 /imagine prompt:https://(你的图片地址).jpg Super handsome boy IP by pop mart , green suit, no hair, bald head, Scenes in spring , pastel color , mockup , fine luster , clean background ,3D render , Soft focus , oc , bl…...

类和对象 第五部分第四小节:赋值运算符重载
C编译器至少给一个类添加4个函数 1.默认构造函数无参,函数体为空 2.默认析构函数无参,函数体为空 3.默认拷贝沟早函数,对属性进行值拷贝 4.赋值运算符“operator”,对属性进行值拷贝 如果类中有属性指向堆区,做赋值操作…...

Django从入门到精通(一)
目录 一、Django环境搭建与命令 1.1、安装 1.2、命令行 创建项目 编写代码 运行 app概念 1.3、Pycharm创建项目 1.4、虚拟环境 创建虚拟环境 - 命令行 介绍 操作 基本问题 Pycharm 项目虚拟环境 django虚拟环境【安装django最新版本】 django虚拟环境【安装指…...
数据库分表分库的原则
什么是数据库分库分表 数据库分表(Table Sharding) 数据库分表是将一个大表按照某种规则拆分成多个小表存储在不同的物理表中的技术。通常,拆分规则是基于某个列的值进行拆分,例如根据用户ID或日期范围等进行拆分。每个小表只包…...

Java技术栈 —— Docker容器
Java技术栈 —— Docker容器 一、什么是Docker?二、如何安装Docker?三、如何使用Docker? 一、什么是Docker? docker的本意是码头工人。 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个…...

Mysql大数据量分页优化
前言 之前有看过到mysql大数据量分页情况下性能会很差,但是没有探究过它的原因,今天讲一讲mysql大数据量下偏移量很大,性能很差的问题,并附上解决方式。 原因 将原因前我们先做一个试验,我做试验使用的是mysql5.7.2…...

QT tcp与udp网络通信以及定时器的使用 (7)
QT tcp与udp网络通信以及定时器的使用 文章目录 QT tcp与udp网络通信以及定时器的使用1、QT网络与通信简单介绍2、QT TCP通信1、 服务器的流程2、 客户端的流程3、服务器的编写4、客户端的编写 3、QT UDP通信1、客户端流程2、客户端编写3、UDP广播4、UDP组播 4、定时器的用法1、…...
web架构师编辑器内容-添加自动保存的功能
对于频繁改动的应用,自动保存的功能是一个非常有用的功能,可以避免用户在没有保存的情况下丢失自己保存过的数据。 对于自动保存,一般有两种实现,参考语雀和石墨: 语雀采用的是定时保存的方式,大约在3分半…...

【Redis】关于它为什么快?使用场景?以及使用方式?为何引入多线程?
目录 1.既然redis那么快,为什么不用它做主数据库,只用它做缓存? 2.Redis 一般在什么场合下使用? 3.redis为什么这么快? 4.Redis为什么要引入了多线程? 1.既然redis那么快,为什么不用它做主数据…...

SpringBoot之JWT登录
JWT JSON Web Token(JSON Web令牌) 是一个开放标准(rfc7519),它定义了一种紧凑的、自包含的方式,用于在各方之间以JSON对象安全地传输信息。此信息可以验证和信任,因为它是数字签名的。jwt可以使用秘密〈使用HNAC算法…...

【备战蓝桥杯】——循环结构
🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 💫个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-bFHV3Dz5xMe6d3NB {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…...

【数据分享】1929-2023年全球站点的逐年平均气温数据(Shp\Excel\免费获取)
气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据!本次我们为大家带来的就是具体到气象监…...

探索Pyecharts关系图绘制技巧:炫酷效果与创意呈现【第42篇—python:Pyecharts水球图】
文章目录 Pyecharts绘制多种炫酷关系网图引言准备工作代码实战1. 基本关系网图2. 自定义节点样式和边样式3. 关系网图的层级结构4. 添加标签和工具提示5. 动态关系网图6. 高级关系网图 - Les Miserables 示例7. 自定义关系网图布局8. 添加背景图9. 3D 关系网图10. 热力关系网图…...

蓝桥杯-循环节长度
两个整数做除法,有时会产生循环小数,其循环部分称为: 循环节。比如,11/136>0.8461553846153..... 其循环节为[846153] 共有 6 位。下面的方法,可以求出循环节的长度。请仔细阅读代码,并填写划线部分缺少的代码。 注…...
Jython调用openwire库连接activemq转发topic订阅消息到另一个activemq 服务器上 完整代码
以下是一个示例代码,演示如何在Jython中使用OpenWire库连接ActiveMQ,将一个主题(topic)上的订阅消息转发到另一个ActiveMQ服务器上: from org.apache.activemq import * from org.apache.activemq.transport import *…...
面试经典题---30.串联所有单词的子串
30.串联所有单词的子串 我的解法: 滑动窗口: 解法中用到了两个哈希表map1和map2,分别用于记录words中各个单词的出现频数和当前滑动窗口[left, right)中单词的出现频数;外部for循环i从0到len - 1,内部while循环每次会…...
字符串随机生成工具(开源)-Kimen(奇门)
由于最近笔者在开发数据脱敏相关功能,其中一类脱敏需求为能够按照指定的格式随机生成一个字符串来代替原有信息,数据看起来格式需要与原数据相同,如:电话号码,身份证号以及邮箱等。在网上搜索了下,发现没有…...

UE4 CustomDepthMobile流程小记
原生UE opaque材质中获取CustomDepth/CustomStencil会报错 在其Compile中调用的函数中没有看到报错逻辑 材质节点的逻辑都没有什么问题,所以看一下报错 在HLSLMaterialTranslator::Translate中 修改之后 mobile流程的不透明材质可以直接获取SceneTexture::customd…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...