消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)
4、Work Queues
Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
1、抽取工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("42.192.149.23");factory.setUsername("admin");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
2、启动两个工作线程来接受消息
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {//定义队列private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//利用工具类创建信道Channel channel = RabbitMqUtils.getChannel();//消息接受,实现接口函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};//消费者消费System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
(1)开启多线程
选中 Allow multiple instances:
(2)启动后
3、启动一个发送消息线程
public class Task01 {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) { //hasNext,有下一个,就发送String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //生产者发送消息,理解各参数意思System.out.println("消息发送完成:" + message);}}
}
启动后
C1、C2,轮询接受到消息
5、消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。
**RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。**在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
1、自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
2、手动消息应答的方法
Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
Channel.basicNack (用于否定确认)
Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
Multiple 的解释:
手动应答的好处是可以批量应答并且减少网络拥堵 。
true 代表批量应答 channel 上未应答的消息
false 同上面相比只会应答 tag=8 的消息
3、消息自动重新入队
如果消费者由于某些原因失去连接 (其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
4、消息手动应答代码
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
(1)消息生产者
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** 消息生产者,消息在手动应答时是不丢失的,放回队列重新消费。** */
public class Task02 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();//发布消息channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
(2)睡眠工具类
(3)消费者 01
(4)消费者02:把睡眠时间改成 30 秒
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Work03 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息处理时间较 短");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接收到消息:" + message);/*** 1.消息标记 tag* 2.是否批量应答未应答消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (s) -> {System.out.println(s + "消费者取消消费接口回调逻辑");};//采用手动应答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}
}
启动后:
中途停掉消费者02
消息由消费者01接收
理解:在生产者轮询发送消息:cc和dd,给消费者01和消费者02的时候,消费者01处理的消息的时间较短,所以立马就接受到了,但是消费者02处理消息的时间较长,需要30s。如果在消费者02处理消息的时候,将消费者02关闭,那么生产者发送的消息dd,虽然消费者02 无法接收了,但是也不会中途丢失,消息dd会重新返回到队列中,然后再从队列中,发送给消费者01。这就是RabbitMQ的消息应答机制。
6、RabbitMQ 持久化(3步走)
当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
1、队列如何实现持久化(第1步)
之前创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为持久化。
//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
(1)直接修改声明会报错
(2)在RabbitMQ的Web界面,手动删除队列
(3)将队列持久化
2、消息持久化(第2步)
消息实现持久化需要在消息生产者修改代码
MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。
7、不公平分发(能者多劳)
问题
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1)
需要看一下basicQos的源码,参数设置的要求
//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
测试效果:
生产者连续发多几条消息
让处理消息速度快的消费者01,多接收一点消息
消费者02接受的少
时间长,所以少
理解:意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳) 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧
相关文章:

消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)
4、Work Queues Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作…...

前端秘法基础式(HTML)(第二卷)
目录 一.表单标签 1.表单域 2.表单控件 2.1input标签 2.2label/select/textarea标签 2.3无语义标签 三.特殊字符 一.表单标签 用来完成与用户的交互,例如登录系统 1.表单域 <form>通过action属性,将用户填写的数据转交给服务器 2.表单控件 2.1input标签 type…...
PTA-统计英文字母和数字字符[2]
本题要求编写程序,输入N个字符,统计其中英文字母、数字字符和其他字符的个数。 输入格式: 输入在第一行中给出正整数N,第二行输入N个字符,最后一个回车表示输入结束,不算在内。 输出格式: 在一行内按照 letter 英…...

Elasticsearch:将 IT 智能和业务 KPI 与 AI 连接起来 - 房间里的大象
作者:Fermi Fang 大象寓言的智慧 在信息技术和商业领导力的交叉点,蒙眼人和大象的古老寓言提供了一个富有洞察力的类比。 这个故事起源于印度次大陆,讲述了六个蒙住眼睛的人第一次遇到大象的故事。 每个人触摸大象的不同部位 —— 侧面、象牙…...

基于芯驰 X9HP PTG4.1 在 yocto 中添加 Linux 应用
1.参考例程并添加应用 1.1 参考例程 (1)查看自带的串口测试例程 uart_test ,查看 bb 文件怎么写的。 1.2 添加 printf-test 应用 (1)在 yocto/meta-semidrive/recipes-bsp/ 目录中 copy 自带例程 uart-test 改名为 …...
【微服务安全】OpenID Connect 简介:现代应用程序的身份验证
OpenID Connect (OIDC) 是一个建立在 OAuth 2.0 之上的开放身份验证协议。它简化了应用程序以一种标准化和可互操作的方式验证用户身份并获取其基本个人资料信息的方式。可以将其视为应用程序“知道你是谁”的一种安全方式,而无需你创建单独的帐户或透露你的密码。 …...

Linux系统中HTTP隧道的搭建与配置步骤:穿越网络的“魔法隧道”
在Linux系统中搭建HTTP隧道,就像是开启了一条穿越网络的“魔法隧道”。这条隧道能让你的数据在网络中自由穿梭,无论是远程办公还是数据同步,都能变得轻松自在。下面,就让我们一起探索如何搭建这条神奇的“魔法隧道”吧!…...

fastApi笔记04-查询参数和字符串校验
额外校验 使用Query可以对查询参数添加校验 from typing import Unionfrom fastapi import FastAPI, Queryapp FastAPI()app.get("/items/") async def read_items(q: Union[str, None] Query(defaultNone, max_length50)):results {"items": [{"…...

笔记 记录
前言 个人记录 官网模版 基于 vue2 示例图...

相机图像质量研究(38)常见问题总结:编解码对成像的影响--呼吸效应
系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…...

MQTT协议-ISO标准下基于发布/订阅范式的消息协议
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。 MQTT是一个…...

手动实现new操作符
<script>//前置知识// 每一个函数在创建之初就会有一个prototype属性,这个属性指向函数的原型对象// function abc(){// }// abc.prototype--> {constructor: f}// 在JS中任意的对象都有内置的属性叫做[[prototype]]这是一个私有属性,这个私有属…...

【flutter】环境安装
安装flutter sdk 下载sdk flutter sdk就包含dart,所以我们只用安装flutter sdk就可以了。 我们去清华大学开源软件镜像站下载,flutter开发中,版本对不上基本项目就跑步起来,如果是团队协同开发的话,建议统一下载指定版…...
ROUGE-L和SPICE
ROUGE-L ROUGE-L(Recall-Oriented Understudy for Gisting Evaluation - Longest Common Subsequence)是一种用于评估自动文本摘要或机器翻译等自然语言处理任务的评价指标。它基于最长公共子序列(LCS)来计算,主要关注…...

vue3组件通信方式汇总
前言:本文默认读者有JS基础和Vue基础,如果没有这个两个基础,可能阅读比较困难,建议先看下官方文档,当然,也欢迎评论交流😁 通信方式总结 常见搭配形式 一、props(使用频率最高&#…...
备份服务器数据的重要
备份服务器数据的重要 无论您是在运营一个网站、一个业务应用程序还是整个平台,无法定期备份服务器数据都可能将会再次困扰您。这不是一个是否的问题。这是个何时的问题。你们需要将灾难性故障的潜在损害降至最低。 灾难性故障期间最大限度地减少潜在损害的最佳方法…...

基于shp数据制作3DTiles建筑白膜
经纬管网建模系统MagicPipe3D,本地离线参数化构建地下管网、建筑三维模型,输出标准3DTiles服务、Obj模型等格式,支持Cesium、Unreal、Unity、Osg等引擎加载进行三维可视化、语义查询、专题分析。欢迎下载试用:http://www.magic3d.…...

SpringBootWeb学习笔记——12万字详细总结!
0. 写在前面 注:这套笔记是根据黑马程序员B站2023-3-21的视频学习的成果,其中省略了前端基础部分、Maven部分和数据库基础部分,详情可见目录。 注注:目前文章内结尾处多幅图片加载不出来,因为图片还存在本地没被传上来,过段时间再改~ 所有的Spring项目都基于Spring Fra…...

Code Composer Studio (CCS) - 文件比较
Code Composer Studio [CCS] - 文件比较 References 鼠标单击选中一个文件,再同时按住 Ctrl 鼠标左键来选中第二个文件,在其中一个文件上鼠标右击选择 Compare With -> Each Other. References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.n…...
【GIT学习】仓库过大的清理办法
1、.git目录过大 要解决.git目录过大的问题,可以尝试以下方法: 使用git gc命令清理不再需要的缓存。这将帮助减小仓库的大小。 在命令行中输入以下命令: git gc --prunenow --aggressive使用git repack -ad命令来重新打包已经提交的文件。 …...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...

P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...