消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)
4、Work Queues
Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训分发消息
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

1、抽取工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("42.192.149.23");factory.setUsername("admin");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
2、启动两个工作线程来接受消息
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {//定义队列private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//利用工具类创建信道Channel channel = RabbitMqUtils.getChannel();//消息接受,实现接口函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};//消费者消费System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
(1)开启多线程

选中 Allow multiple instances:

(2)启动后

3、启动一个发送消息线程

public class Task01 {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) { //hasNext,有下一个,就发送String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //生产者发送消息,理解各参数意思System.out.println("消息发送完成:" + message);}}
}
启动后

C1、C2,轮询接受到消息

5、消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。
**RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。**在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
1、自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
2、手动消息应答的方法
Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
Channel.basicNack (用于否定确认)
Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
Multiple 的解释:
手动应答的好处是可以批量应答并且减少网络拥堵 。
true 代表批量应答 channel 上未应答的消息
false 同上面相比只会应答 tag=8 的消息
3、消息自动重新入队
如果消费者由于某些原因失去连接 (其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
4、消息手动应答代码
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

(1)消息生产者

import com.rabbitmq.client.Channel;import java.util.Scanner;/*** 消息生产者,消息在手动应答时是不丢失的,放回队列重新消费。** */
public class Task02 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();//发布消息channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
(2)睡眠工具类

(3)消费者 01


(4)消费者02:把睡眠时间改成 30 秒

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Work03 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息处理时间较 短");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接收到消息:" + message);/*** 1.消息标记 tag* 2.是否批量应答未应答消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (s) -> {System.out.println(s + "消费者取消消费接口回调逻辑");};//采用手动应答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}
}
启动后:

中途停掉消费者02

消息由消费者01接收


理解:在生产者轮询发送消息:cc和dd,给消费者01和消费者02的时候,消费者01处理的消息的时间较短,所以立马就接受到了,但是消费者02处理消息的时间较长,需要30s。如果在消费者02处理消息的时候,将消费者02关闭,那么生产者发送的消息dd,虽然消费者02 无法接收了,但是也不会中途丢失,消息dd会重新返回到队列中,然后再从队列中,发送给消费者01。这就是RabbitMQ的消息应答机制。
6、RabbitMQ 持久化(3步走)
当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
1、队列如何实现持久化(第1步)
之前创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为持久化。
//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
(1)直接修改声明会报错

(2)在RabbitMQ的Web界面,手动删除队列


(3)将队列持久化

2、消息持久化(第2步)
消息实现持久化需要在消息生产者修改代码
MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。
7、不公平分发(能者多劳)
问题
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1)
需要看一下basicQos的源码,参数设置的要求
//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
测试效果:
生产者连续发多几条消息

让处理消息速度快的消费者01,多接收一点消息

消费者02接受的少
时间长,所以少
理解:意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳) 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧
相关文章:
消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)
4、Work Queues Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作…...
前端秘法基础式(HTML)(第二卷)
目录 一.表单标签 1.表单域 2.表单控件 2.1input标签 2.2label/select/textarea标签 2.3无语义标签 三.特殊字符 一.表单标签 用来完成与用户的交互,例如登录系统 1.表单域 <form>通过action属性,将用户填写的数据转交给服务器 2.表单控件 2.1input标签 type…...
PTA-统计英文字母和数字字符[2]
本题要求编写程序,输入N个字符,统计其中英文字母、数字字符和其他字符的个数。 输入格式: 输入在第一行中给出正整数N,第二行输入N个字符,最后一个回车表示输入结束,不算在内。 输出格式: 在一行内按照 letter 英…...
Elasticsearch:将 IT 智能和业务 KPI 与 AI 连接起来 - 房间里的大象
作者:Fermi Fang 大象寓言的智慧 在信息技术和商业领导力的交叉点,蒙眼人和大象的古老寓言提供了一个富有洞察力的类比。 这个故事起源于印度次大陆,讲述了六个蒙住眼睛的人第一次遇到大象的故事。 每个人触摸大象的不同部位 —— 侧面、象牙…...
基于芯驰 X9HP PTG4.1 在 yocto 中添加 Linux 应用
1.参考例程并添加应用 1.1 参考例程 (1)查看自带的串口测试例程 uart_test ,查看 bb 文件怎么写的。 1.2 添加 printf-test 应用 (1)在 yocto/meta-semidrive/recipes-bsp/ 目录中 copy 自带例程 uart-test 改名为 …...
【微服务安全】OpenID Connect 简介:现代应用程序的身份验证
OpenID Connect (OIDC) 是一个建立在 OAuth 2.0 之上的开放身份验证协议。它简化了应用程序以一种标准化和可互操作的方式验证用户身份并获取其基本个人资料信息的方式。可以将其视为应用程序“知道你是谁”的一种安全方式,而无需你创建单独的帐户或透露你的密码。 …...
Linux系统中HTTP隧道的搭建与配置步骤:穿越网络的“魔法隧道”
在Linux系统中搭建HTTP隧道,就像是开启了一条穿越网络的“魔法隧道”。这条隧道能让你的数据在网络中自由穿梭,无论是远程办公还是数据同步,都能变得轻松自在。下面,就让我们一起探索如何搭建这条神奇的“魔法隧道”吧!…...
fastApi笔记04-查询参数和字符串校验
额外校验 使用Query可以对查询参数添加校验 from typing import Unionfrom fastapi import FastAPI, Queryapp FastAPI()app.get("/items/") async def read_items(q: Union[str, None] Query(defaultNone, max_length50)):results {"items": [{"…...
笔记 记录
前言 个人记录 官网模版 基于 vue2 示例图...
相机图像质量研究(38)常见问题总结:编解码对成像的影响--呼吸效应
系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…...
MQTT协议-ISO标准下基于发布/订阅范式的消息协议
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。 MQTT是一个…...
手动实现new操作符
<script>//前置知识// 每一个函数在创建之初就会有一个prototype属性,这个属性指向函数的原型对象// function abc(){// }// abc.prototype--> {constructor: f}// 在JS中任意的对象都有内置的属性叫做[[prototype]]这是一个私有属性,这个私有属…...
【flutter】环境安装
安装flutter sdk 下载sdk flutter sdk就包含dart,所以我们只用安装flutter sdk就可以了。 我们去清华大学开源软件镜像站下载,flutter开发中,版本对不上基本项目就跑步起来,如果是团队协同开发的话,建议统一下载指定版…...
ROUGE-L和SPICE
ROUGE-L ROUGE-L(Recall-Oriented Understudy for Gisting Evaluation - Longest Common Subsequence)是一种用于评估自动文本摘要或机器翻译等自然语言处理任务的评价指标。它基于最长公共子序列(LCS)来计算,主要关注…...
vue3组件通信方式汇总
前言:本文默认读者有JS基础和Vue基础,如果没有这个两个基础,可能阅读比较困难,建议先看下官方文档,当然,也欢迎评论交流😁 通信方式总结 常见搭配形式 一、props(使用频率最高&#…...
备份服务器数据的重要
备份服务器数据的重要 无论您是在运营一个网站、一个业务应用程序还是整个平台,无法定期备份服务器数据都可能将会再次困扰您。这不是一个是否的问题。这是个何时的问题。你们需要将灾难性故障的潜在损害降至最低。 灾难性故障期间最大限度地减少潜在损害的最佳方法…...
基于shp数据制作3DTiles建筑白膜
经纬管网建模系统MagicPipe3D,本地离线参数化构建地下管网、建筑三维模型,输出标准3DTiles服务、Obj模型等格式,支持Cesium、Unreal、Unity、Osg等引擎加载进行三维可视化、语义查询、专题分析。欢迎下载试用:http://www.magic3d.…...
SpringBootWeb学习笔记——12万字详细总结!
0. 写在前面 注:这套笔记是根据黑马程序员B站2023-3-21的视频学习的成果,其中省略了前端基础部分、Maven部分和数据库基础部分,详情可见目录。 注注:目前文章内结尾处多幅图片加载不出来,因为图片还存在本地没被传上来,过段时间再改~ 所有的Spring项目都基于Spring Fra…...
Code Composer Studio (CCS) - 文件比较
Code Composer Studio [CCS] - 文件比较 References 鼠标单击选中一个文件,再同时按住 Ctrl 鼠标左键来选中第二个文件,在其中一个文件上鼠标右击选择 Compare With -> Each Other. References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.n…...
【GIT学习】仓库过大的清理办法
1、.git目录过大 要解决.git目录过大的问题,可以尝试以下方法: 使用git gc命令清理不再需要的缓存。这将帮助减小仓库的大小。 在命令行中输入以下命令: git gc --prunenow --aggressive使用git repack -ad命令来重新打包已经提交的文件。 …...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...
