RabbitMQ简单模式和工作模式
RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。
-
简单模式(Simple Mode):
- 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
- 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
- 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
-
工作模式(Work Queue Mode):
- 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
- 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
- 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。
在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。
下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。
首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:
npm install amqplib
接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:
producer.ts:
import * as amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });for (let i = 0; i < 10; i++) {const message = `Message ${i}`;channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log(` [x] Sent '${message}'`);}setTimeout(() => {connection.close();process.exit(0);}, 500);
}produce();
consumer.ts:
import * as amqp from 'amqplib';async function consume() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });// 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]//这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息await channel.prefetch(1);console.log(' [*] Waiting for messages. To exit press CTRL+C');await channel.consume(queue, async (msg) => {if (msg !== null) {const message = msg.content.toString();console.log(` [x] Received ${message}`);// Simulate some workawait new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);}});
}consume();
这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。
记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:
tsc producer.ts
tsc consumer.ts
然后,分别运行 producer.js 和 consumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。
RabbitMQ消息持久化和手动应答
在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:
-
消息持久化(Message Durability):
- 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
- 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的
deliveryMode属性为2(persistent)。 - 例如,在生产者端设置消息为持久化:
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });- 在消费者端,你需要确保队列和消息都被声明为持久化:
channel.assertQueue(queue, { durable: true });这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。
-
手动应答(Manual Acknowledgment):
- 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
- 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
- 在消费者端,需要将
noAck设置为false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {//模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 重新入队,进行下一次尝试channel.reject(msg, true);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
}, { noAck: false });//默认false
- 在这种情况下,消费者需要在处理完消息后显式调用
channel.ack(msg)来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。 - 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
msg: 要拒绝的消息对象。
requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。
综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。
重试间隔和次数


-
重新投递消息并设置头部信息:
- 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如
x-redelivered-count,用来记录消息的重试次数。 - 在消费者端,根据这个头部信息来判断是否达到重试次数的上限,如果是,则不再重新投递,可能将消息放入死信交换机。
- 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如
-
使用外部存储记录重试次数:
- 每次消息处理失败时,将消息的唯一标识(例如 UUID)和重试次数记录到外部存储中(例如 Redis、Memcache、MySQL)。
- 在消费者端,在每次重新处理时,从外部存储中获取当前重试次数,并判断是否达到重试次数的上限。
-
自定义插件:
- 编写一个 RabbitMQ 插件,实现自定义的消息重试逻辑,包括记录重试次数、判断是否重新投递等。
- 这样可以更灵活地控制消息的处理流程。
需要注意的是,这些方法都是基于 RabbitMQ 不直接提供重试次数限制的情况下的一些自定义实践。在回答中也提到了关于 quorum queues 的更新,以及支持通过策略(policy)来限制重投递次数的可能性。因此,具体的实现方式可能会随着 RabbitMQ 版本的更新而有所变化。
await channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());let retryCount = msg.properties.headers['x-retry-count'] || 0;console.log('Processing message:', data);console.log('Retry count:', retryCount);try {if (data.to.includes('recipient1@example.com')) {throw new Error('邮件发送失败...');}//发送邮件await new Promise(resolve => setTimeout(resolve, 1000));channel.ack(msg);} catch (error) {console.log('error:', data);// 增加重试次数retryCount++;// 判断是否达到最大重试次数if (retryCount < maxRetryAttempts) {// 重新发送消息到队列channel.sendToQueue(queueName, msg.content, {persistent: true,headers: {'x-retry-count': retryCount,},});} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
});

相关文章:
RabbitMQ简单模式和工作模式
RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。 简单模式(Simple Mode): 在简单模式中,有…...
c语言实战之贪吃蛇
文章目录 前言效果展示游戏用到的图片游戏思路一览游戏前准备一、贪吃蛇、食物、障碍物节点坐标的结构体二、枚举游戏状态、和贪吃蛇的方向三、维护运行的结构体 游戏开始前的初始化一、学习图形库相关知识二、设置背景三、欢迎界面四、初始化贪吃蛇五、生成障碍物六、生成食物…...
Midjourney图片生成描述词记录(今天一天)
抄别人的描述词 /imagine prompt:https://(你的图片地址).jpg Super handsome boy IP by pop mart , green suit, no hair, bald head, Scenes in spring , pastel color , mockup , fine luster , clean background ,3D render , Soft focus , oc , bl…...
类和对象 第五部分第四小节:赋值运算符重载
C编译器至少给一个类添加4个函数 1.默认构造函数无参,函数体为空 2.默认析构函数无参,函数体为空 3.默认拷贝沟早函数,对属性进行值拷贝 4.赋值运算符“operator”,对属性进行值拷贝 如果类中有属性指向堆区,做赋值操作…...
Django从入门到精通(一)
目录 一、Django环境搭建与命令 1.1、安装 1.2、命令行 创建项目 编写代码 运行 app概念 1.3、Pycharm创建项目 1.4、虚拟环境 创建虚拟环境 - 命令行 介绍 操作 基本问题 Pycharm 项目虚拟环境 django虚拟环境【安装django最新版本】 django虚拟环境【安装指…...
数据库分表分库的原则
什么是数据库分库分表 数据库分表(Table Sharding) 数据库分表是将一个大表按照某种规则拆分成多个小表存储在不同的物理表中的技术。通常,拆分规则是基于某个列的值进行拆分,例如根据用户ID或日期范围等进行拆分。每个小表只包…...
Java技术栈 —— Docker容器
Java技术栈 —— Docker容器 一、什么是Docker?二、如何安装Docker?三、如何使用Docker? 一、什么是Docker? docker的本意是码头工人。 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个…...
Mysql大数据量分页优化
前言 之前有看过到mysql大数据量分页情况下性能会很差,但是没有探究过它的原因,今天讲一讲mysql大数据量下偏移量很大,性能很差的问题,并附上解决方式。 原因 将原因前我们先做一个试验,我做试验使用的是mysql5.7.2…...
QT tcp与udp网络通信以及定时器的使用 (7)
QT tcp与udp网络通信以及定时器的使用 文章目录 QT tcp与udp网络通信以及定时器的使用1、QT网络与通信简单介绍2、QT TCP通信1、 服务器的流程2、 客户端的流程3、服务器的编写4、客户端的编写 3、QT UDP通信1、客户端流程2、客户端编写3、UDP广播4、UDP组播 4、定时器的用法1、…...
web架构师编辑器内容-添加自动保存的功能
对于频繁改动的应用,自动保存的功能是一个非常有用的功能,可以避免用户在没有保存的情况下丢失自己保存过的数据。 对于自动保存,一般有两种实现,参考语雀和石墨: 语雀采用的是定时保存的方式,大约在3分半…...
【Redis】关于它为什么快?使用场景?以及使用方式?为何引入多线程?
目录 1.既然redis那么快,为什么不用它做主数据库,只用它做缓存? 2.Redis 一般在什么场合下使用? 3.redis为什么这么快? 4.Redis为什么要引入了多线程? 1.既然redis那么快,为什么不用它做主数据…...
SpringBoot之JWT登录
JWT JSON Web Token(JSON Web令牌) 是一个开放标准(rfc7519),它定义了一种紧凑的、自包含的方式,用于在各方之间以JSON对象安全地传输信息。此信息可以验证和信任,因为它是数字签名的。jwt可以使用秘密〈使用HNAC算法…...
【备战蓝桥杯】——循环结构
🌈个人主页: Aileen_0v0 🔥热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 💫个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-bFHV3Dz5xMe6d3NB {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…...
【数据分享】1929-2023年全球站点的逐年平均气温数据(Shp\Excel\免费获取)
气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据!本次我们为大家带来的就是具体到气象监…...
探索Pyecharts关系图绘制技巧:炫酷效果与创意呈现【第42篇—python:Pyecharts水球图】
文章目录 Pyecharts绘制多种炫酷关系网图引言准备工作代码实战1. 基本关系网图2. 自定义节点样式和边样式3. 关系网图的层级结构4. 添加标签和工具提示5. 动态关系网图6. 高级关系网图 - Les Miserables 示例7. 自定义关系网图布局8. 添加背景图9. 3D 关系网图10. 热力关系网图…...
蓝桥杯-循环节长度
两个整数做除法,有时会产生循环小数,其循环部分称为: 循环节。比如,11/136>0.8461553846153..... 其循环节为[846153] 共有 6 位。下面的方法,可以求出循环节的长度。请仔细阅读代码,并填写划线部分缺少的代码。 注…...
Jython调用openwire库连接activemq转发topic订阅消息到另一个activemq 服务器上 完整代码
以下是一个示例代码,演示如何在Jython中使用OpenWire库连接ActiveMQ,将一个主题(topic)上的订阅消息转发到另一个ActiveMQ服务器上: from org.apache.activemq import * from org.apache.activemq.transport import *…...
面试经典题---30.串联所有单词的子串
30.串联所有单词的子串 我的解法: 滑动窗口: 解法中用到了两个哈希表map1和map2,分别用于记录words中各个单词的出现频数和当前滑动窗口[left, right)中单词的出现频数;外部for循环i从0到len - 1,内部while循环每次会…...
字符串随机生成工具(开源)-Kimen(奇门)
由于最近笔者在开发数据脱敏相关功能,其中一类脱敏需求为能够按照指定的格式随机生成一个字符串来代替原有信息,数据看起来格式需要与原数据相同,如:电话号码,身份证号以及邮箱等。在网上搜索了下,发现没有…...
UE4 CustomDepthMobile流程小记
原生UE opaque材质中获取CustomDepth/CustomStencil会报错 在其Compile中调用的函数中没有看到报错逻辑 材质节点的逻辑都没有什么问题,所以看一下报错 在HLSLMaterialTranslator::Translate中 修改之后 mobile流程的不透明材质可以直接获取SceneTexture::customd…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
ETLCloud可能遇到的问题有哪些?常见坑位解析
数据集成平台ETLCloud,主要用于支持数据的抽取(Extract)、转换(Transform)和加载(Load)过程。提供了一个简洁直观的界面,以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...
《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
第八部分:阶段项目 6:构建 React 前端应用
现在,是时候将你学到的 React 基础知识付诸实践,构建一个简单的前端应用来模拟与后端 API 的交互了。在这个阶段,你可以先使用模拟数据,或者如果你的后端 API(阶段项目 5)已经搭建好,可以直接连…...
机器学习的数学基础:线性模型
线性模型 线性模型的基本形式为: f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法,得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...
基于 HTTP 的单向流式通信协议SSE详解
SSE(Server-Sent Events)详解 🧠 什么是 SSE? SSE(Server-Sent Events) 是 HTML5 标准中定义的一种通信机制,它允许服务器主动将事件推送给客户端(浏览器)。与传统的 H…...
