RabbitMQ---work消息模型
1、work消息模型
工作队列或者竞争消费者模式

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
o P:生产者:任务的发布者
o C1:消费者,领取任务并且完成任务,假设完成速度较快
o C2:消费者2:领取任务并完成任务,假设完成速度慢
面试题:避免消息堆积?
1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。
1.1、生产者
生产者与案例1中的几乎一样:
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 关闭通道和连接channel.close();connection.close();}
}
不过这里我们是循环发送50条消息。
1.2、消费者1
// 消费者1
public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");try {// 模拟完成任务的耗时:1000msThread.sleep(1000);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}
1.3、消费者2
//消费者2
public class Recv2 {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");try {// 模拟完成任务的耗时:200msThread.sleep(200);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}
与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:


可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。
1.4、能者多劳
• 刚才的实现有问题吗?
o 消费者1比消费者2的效率要低,一次任务的耗时较长
o 然而两人最终消费的消息数量是一样的
o 消费者2大量时间处于空闲状态,消费者1一直忙碌
• 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
• 怎么实现呢?
o 我们可以使用basicQos方法和prefetchCount = 1设置。
o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。
o 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。
o 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

再次测试:


相关文章:
RabbitMQ---work消息模型
1、work消息模型 工作队列或者竞争消费者模式 在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。 工作队列,又称任务队列。主要思…...
GitRedisNginx合集
目录 文件传下载 Git常用命令 Git工作区中文件的状态 远程仓库操作 分支操作 标签操作 idea中使用git 设置git.exe路径 操作步骤 linux配置jdk 安装tomcat 查看是否启动成功 查看tomcat进程 防火墙操作 开放指定端口并立即生效 安装mysql 修改mysql密码 安装lrzsz软…...
系统架构设计师之缓存技术:Redis与Memcache能力比较
系统架构设计师之缓存技术:Redis与Memcache能力比较...
02.sqlite3学习——嵌入式数据库的基本要求和SQLite3的安装
目录 嵌入式数据库的基本要求和SQLite3的安装 嵌入式数据库的基本要求 常见嵌入式数据库 sqlite3简介 SQLite3编程接口模型 ubuntu 22.04下的SQLite安装 嵌入式数据库的基本要求和SQLite3的安装 嵌入式数据库的基本要求 常见嵌入式数据库 sqlite3简介 SQLite3编程接口模…...
AIGC ChatGPT 按年份进行动态选择的动态图表
动态可视化分析的好处与优势: 1. 提高信息理解性:可视化分析使得大量复杂的数据变得易于理解,通过图表、颜色、形状、尺寸等方式,能够直观地表现不同的数据关系和模式。 2. 加快决策速度:数据可视化可以帮助用户更快…...
分布式—雪花算法生成ID
一、简介 1、雪花算法的组成: 由64个Bit(比特)位组成的long类型的数字 0 | 0000000000 0000000000 0000000000 000000000 | 00000 | 00000 | 000000000000 1个bit:符号位,始终为0。 41个bit:时间戳,精确到毫秒级别&a…...
Python语言实现React框架
迷途小书童的 Note 读完需要 6分钟 速读仅需 2 分钟 1 reactpy 介绍 reactpy 是一个用 Python 语言实现的 ReactJS 框架。它可以让我们使用 Python 的方式来编写 React 的组件,构建用户界面。 reactpy 的目标是想要将 React 的优秀特性带入 Python 领域,…...
Netty入门学习和技术实践
Netty入门学习和技术实践 Netty1.Netty简介2.IO模型3.Netty框架介绍4. Netty实战项目学习5. Netty实际应用场景6.扩展 Netty 1.Netty简介 Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具&…...
MySQL详细安装与配置
免安装版的Mysql MySQL关是一种关系数据库管理系统,所使用的 SQL 语言是用于访问数据库的最常用的 标准化语言,其特点为体积小、速度快、总体拥有成本低,尤其是开放源码这一特点,在 Web 应用方面 MySQL 是最好的 RDBMS(Relation…...
裸露土堆识别算法
裸露土堆识别算法首先利用图像处理技术,提取出图像中的土堆区域。裸露土堆识别算法首通过计算土堆中被绿色防尘网覆盖的比例,判断土堆是否裸露。若超过40%的土堆没有被绿色防尘网覆盖,则视为裸露土堆。当我们谈起计算机视觉时,首先…...
说说你对Redux的理解?其工作原理?
文章目录 redux?工作原理如何使用后言 redux? React是用于构建用户界面的,帮助我们解决渲染DOM的过程 而在整个应用中会存在很多个组件,每个组件的state是由自身进行管理,包括组件定义自身的state、组件之间的通信通…...
《基于 Vue 组件库 的 Webpack5 配置》7.路径别名 resolve.alias 和 性能 performance
路径别名 resolve.alias const path require(path);module.exports {resolve: {alias: {"": path.resolve(__dirname, "./src/"),"assets": path.resolve(__dirname, "./src/assets/"),"mixins": path.resolve(__dirname,…...
基于PaddleOCR2.7.0发布WebRest服务测试案例
基于PaddleOCR2.7.0发布WebRest服务测试案例 #WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. #警告:这是一个开发服务器。不要在生产部署中使用它。请改用生产WSGI服务器。 输出结果…...
Solidity 合约安全,常见漏洞 (下篇)
Solidity 合约安全,常见漏洞 (下篇) Solidity 合约安全,常见漏洞 (上篇) 不安全的随机数 目前不可能用区块链上的单一交易安全地产生随机数。区块链需要是完全确定的,否则分布式节点将无法达…...
nodejs根据pdf模板填入中文数据并生成新的pdf文件
导入pdf-lib库和fontkit npm install pdf-lib fs npm install pdf-lib/fontkit 具体代码 const { PDFDocument, StandardFonts } require(pdf-lib); const fs require(fs); const fontkit require(pdf-lib/fontkit) let pdfDoc let font async function fillPdfForm(temp…...
UE4与pycharm联合仿真的调试问题及一些仿真经验
文章目录 ue4与pycharm联合仿真的调试问题前言ue4端的debug过程pycharm端 一些仿真经验小结 ue4与pycharm联合仿真的调试问题 前言 因为在实验中我需要用到py代码输出控制信息给到ue4中,并且希望看到py端和ue端分别在运行过程中的输出以及debug调试。所以…...
【数据分析】波士顿矩阵
波士顿矩阵是一种用于分析市场定位和企业发展战略的管理工具。由美国波士顿咨询集团(Boston Consulting Group)于1970年提出,并以该集团命名。 波士顿矩阵主要基于产品生命周期和市场份额两个维度,将企业的产品或业务分为四个象限…...
sizeof和strlen的对比
文章目录 🚩前言🚩sizeof🚩strlen🚩sizeof和strlen对比 🚩前言 很多小白在学习中,经常将sizeof和strlen弄混了。本篇文章,小编讲解一下sizeof和strlen的区别。🤷♂️ 🚩…...
Flutter系列文章-Flutter 插件开发
在本篇文章中,我们将学习如何开发 Flutter 插件,实现 Flutter 与原生平台的交互。我们将详细介绍插件的开发过程,包括如何创建插件项目、实现方法通信、处理异步任务等。最后,我们还将演示如何将插件打包并发布到 Flutter 社区。 …...
基于SpringBoot实现MySQL与Redis的数据最终一致性
问题场景 在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
python执行测试用例,allure报乱码且未成功生成报告
allure执行测试用例时显示乱码:‘allure’ �����ڲ����ⲿ���Ҳ���ǿ�&am…...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
【Linux系统】Linux环境变量:系统配置的隐形指挥官
。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量:setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...
Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...
node.js的初步学习
那什么是node.js呢? 和JavaScript又是什么关系呢? node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说, 需要在node.js的环境上进行当JavaScript作为前端开发语言来说,需要在浏览器的环境上进行 Node.js 可…...
