当前位置: 首页 > news >正文

【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBind")public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}
@RestController
@RequestMapping("/qos")
public class QosController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void qosQueue() {for (int i = 0; i < 10; i++) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);System.out.println("第" + i + "次发送消息成功!");}}}
@Configuration
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println("接收的消息为:" + msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {@Bean("transactionRabbitTemplate")public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
@Configuration
public class TransactionConfig {@Bean("transactionQueue")public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}@Bean("transactionExchange")public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}@Bean("transactionQueueBind")public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,@Qualifier("transactionExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();}}
@RestController
@RequestMapping("/transaction")
public class TransactionController {@Resource(name = "transactionRabbitTemplate")private RabbitTemplate rabbitTemplate;@Transactional@RequestMappingpublic void transactionQueue() {System.out.println("发送成功");this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");int i = 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");}}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 

相关文章:

【RabbitMQ】消息分发、事务

消息分发 概念 RabbitMQ队列拥有多个消费者时&#xff0c;队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展&#xff0c;如果现在负载加重&#xff0c;那么只需要创建更多的消费者来消费处理消息即可。 默…...

mysql mha高可用集群搭建

文章目录 mha集群搭建主从从部署基本环境准备安装mysql主从配置 mha部署故障修复 搭建完成 mha集群搭建 在 MySQL 高可用架构中&#xff0c;MHA&#xff08;Master High Availability&#xff09;通常采用一主多从的架构。 MHA 可以提供主从复制架构的自动 master failover 功…...

如何解决“json schema validation error ”错误? -- HarmonyOS自学6

一. 问题描述 DevEco Studio工程关闭后&#xff0c;再重新打开时&#xff0c;出现了如下错误提示&#xff1a; json schema validation error 原因&#xff1a; index.visual或其他visual文件中的left等字段的值为负数时&#xff0c;不能以”-0.x“开头&#xff0c;否则就会…...

基于Jeecg-boot开发系统--后端篇

背景 Jeecg-boot是一个后台管理系统&#xff0c;其提供能很多基础的功能&#xff0c;我希望在不修改jeecg-boot代码的前提下增加自己的功能。经过几天的折腾终于搞定了。 首先是基于jeecg-boot微服务的方式来扩展的&#xff0c;jeecg-boot微服务本身的搭建过程就不讲了&#x…...

Spring Boot实战:使用@Import进行业务模块自动化装配

案例背景&#xff1a; 假设我们正在开发一个电子商务平台&#xff0c;该平台需要处理大量的订单数据。为了简化订单处理服务的配置&#xff0c;我们可以利用Import注解来自动注册一些常用的工具类和服务组件。 业务场景描述&#xff1a; 我们需要一个服务来处理订单的创建、…...

Golang | Leetcode Golang题解之第415题字符串相加

题目&#xff1a; 题解&#xff1a; func addStrings(num1 string, num2 string) string {add : 0ans : ""for i, j : len(num1) - 1, len(num2) - 1; i > 0 || j > 0 || add ! 0; i, j i - 1, j - 1 {var x, y intif i > 0 {x int(num1[i] - 0)}if j &g…...

5. 数字证书与公钥基础设施

5. 数字证书与公钥基础设施 (1) PKI 的定义、组成及应用 PKI(Public Key Infrastructure,公钥基础设施) 是一个使用公钥技术来提供安全服务的框架。它定义了如何管理和维护公钥,以及如何通过证书来验证公钥的真实性。PKI的核心组成部分包括: 证书颁发机构(CA, Certifica…...

Centos中关闭swap分区,关闭内存交换

概述&#xff1a; Swap 分区是 Linux 系统中扩展物理内存的一种机制。Swap的主要功能是当全部的RAM被占用并需要更多内存时&#xff0c;用磁盘空间代理RAM内存。Swap对虚拟化技术资源损耗非常大&#xff0c;一般虚拟化是不允许开启交换空间的&#xff0c;如果不关闭Swap&…...

leetcode练习 二叉树的最大深度

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3提示&#xff1a; 树中节点的数量在 [0, 104] 区间内。-100 …...

Scrapy爬虫框架 Items 数据项

在数据抓取和网络爬虫的开发中,Scrapy 框架以其强大的功能和灵活性,成为了开发者首选的工具之一。在一个典型的 Scrapy 项目中,数据项(Items)的定义、Spider 的应用,以及如何使用 ItemLoader 来加载和处理数据,都是开发过程中至关重要的环节。 本教程将分为几个主要部分…...

weblogic CVE-2018-2894 靶场攻略

漏洞描述 Weblogic Web Service Test Page中⼀处任意⽂件上传漏洞&#xff0c;Web Service Test Page 在 "⽣产模式"下默认不开启&#xff0c;所以该漏洞有⼀定限制。 漏洞版本 weblogic 10.3.6.0 weblogic 12.1.3.0 weblogic 12.2.1.2 28 weblogic 12.2.1.3 …...

百易云资产管理运营系统 ticket.edit.php SQL注入漏洞复现

0x01 产品简介 百易云资产管理运营系统,是专门针对企业不动产资产管理和运营需求而设计的一套综合解决方案。该系统能够覆盖资产的全生命周期管理,包括资产的登记、盘点、评估、处置等多个环节,同时提供强大的运营分析功能,帮助企业优化资产配置,提升运营效率。 0x02 漏…...

C++(2)进阶语法

C(2)之进阶语法 Author: Once Day Date: 2024年9月20日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文章可参考专栏: 源码分析_Once-Day的博客-CSDN博客 参考文…...

解决Hive乱码问题

在插入数据后&#xff0c;发现hive乱码 原因&#xff1a;Hive默认将存储表结构的元数据列编码设置为latin1&#xff0c;不支持中文 解决方法&#xff1a;在MySQL中修改对应Hive元数据列的编码 先查看mysql的所有字符集编码 1、先修改my.cnf 代码如下&#xff1a; vim /etc/…...

Streamlit:使用 Python 快速开发 Web 应用

一、简单介绍 Streamlit 是一个开源 Python 库&#xff0c;官网地址&#xff1a; https://streamlit.io/http://StreamlitStreamlit 是一个开源的 Python 框架&#xff0c;旨在为数据科学家和 后端工程师们提供只需几行代码即可创建动态数据应用的功能。 让没有任何前端基础…...

C#基础(11)函数重载

前言 前面我们已经完成了ref和out补充知识点的学习&#xff0c;以及函数参数相关的学习&#xff0c;今天便再次为函数补充一个知识点&#xff1a;函数重载。 函数重载是指在同一个作用域中&#xff0c;可以有多个同名函数&#xff0c;但参数列表不同。它的发展可以追溯到早期…...

堆栈指针寄存器SP的初值是多少?执行PUSH AX命令后,SP的值是多少?执行POP BX后,SP的值是多少?为什么答案给的是200,202,200。

欢迎来到我的技术博客&#xff01; &#x1f389; 这里不仅有满满的编程干货和学习资源&#xff0c;我的某站账号也为你准备了更多实用的技术视频和知识分享。 &#x1f449; 点击关注我的小破站账号&#xff0c;获取更多编程技巧和学习资源&#xff01; 小破站主页 例题 ST…...

python爬虫初体验(二)

在Python中&#xff0c;每个模块都有一个内置的变量 name&#xff0c;用于表示当前模块的名称。当一个Python文件被执行时&#xff0c;Python解释器会首先将该文件作为一个模块导入&#xff0c;并执行其中的代码。此时&#xff0c;__name__的值为模块的名称。 作用 模块可被导…...

细说渗透测试:阶段、流程、工具和自动化开源方案

不知有多少“曾梦想仗剑走天涯”的网络与信息安全从业者&#xff0c;是因为渗透测试的初心而步入这个行业的。不过&#xff0c;您是否对渗透测试及其漏洞扫描的相关概念感到既熟悉又陌生呢&#xff1f;您是否觉得自己还停留在从工作实践中积累的感性认识呢&#xff1f;下面&…...

redis 十大应用场景

Redis 是一个开源的内存数据结构存储系统&#xff0c;广泛应用于各种场景&#xff0c;尤其在高性能、低延迟需求的场景中。以下是 Redis 的一些典型应用场景&#xff1a; 1. 缓存系统 Redis 最常见的用途之一是作为缓存系统&#xff0c;以减少数据库访问的频率&#xff0c;提升…...

操作系统期末版

文章目录 概论处理机管理进程线程处理机调度生产者消费者问题 死锁简介死锁的四个必要条件解决死锁的方法 存储管理链接的三种方式静态链接装入时动态链接运行时链接 装入内存的三种方式绝对装入可重定位装入动态运行时装入 覆盖交换存储管理方式连续分配**分段存储管理方式***…...

【鸿蒙在 ETS (Extendable TypeScript) 中创建多级目录或文件,可以使用鸿蒙的文件系统 API】

鸿蒙在 ETS (Extendable TypeScript) 中创建多级目录或文件&#xff0c;可以使用鸿蒙的文件系统 API。 // 导入需要的模块 import fs from ohos.file.fs;const TAG"Index" Entry Component struct Index {State message: string Hello World;build() {Row() {Colum…...

2480: 2020年06月2级T1:计算矩阵边缘元素之和

题目描述 2020年06月2级第一题题目&#xff1a;计算矩阵边缘元素之和 输入一个整数矩阵&#xff0c;计算位于矩阵边缘的元素之和。所谓矩阵边缘的元素&#xff0c;就是第一行和最后一行的元素以及第一列和最后一列的元素。 输入 第一行分别为矩阵的行数m和列数n&#xff0…...

【CSS-6】深入理解CSS复合选择器:提升样式表的精确性与效率

CSS选择器是前端开发的基石&#xff0c;而复合选择器则是其中最强大且实用的工具之一。本文将全面解析CSS复合选择器的类型、用法、优先级规则以及最佳实践&#xff0c;帮助你编写更高效、更精确的样式表。 1. 什么是复合选择器&#xff1f; 复合选择器是通过组合多个简单选择…...

深入解析Java21核心新特性(虚拟线程,分代 ZGC,记录模式模式匹配增强)

文章目录 前言一、虚拟线程 (Virtual Threads - JEP 444) - 并发的革命1.1 解决的核心问题&#x1f3af;1.2 工作原理与核心机制⚙️1.3 使用详解与最佳实践&#x1f6e0;️1.4 注意事项⚠️1.5 总结 &#x1f4da; 二、分代 ZGC (Generational ZGC - JEP 439) - 低延迟新高度2…...

STM32入门学习之系统时钟配置

1. 时钟就是单片机的心脏。单片机根据时钟频率来控制每个部件的工作&#xff0c;时钟是单片机的脉搏&#xff0c;决定了每条命令运行的速率&#xff0c;没有时钟单片机将停止工作。 如何理解“时钟决定了单片机每条命令运行的速率”&#xff1f; 首先需要去理解单片机中的时…...

附加模块--Qt OpenGL模块功能及架构

一、模块功能&#xff1a; 主要变化 Qt OpenGL 模块的分离&#xff1a; 在 Qt 6 中&#xff0c;原来的 Qt OpenGL 功能被拆分为多个模块 传统的 Qt OpenGL 模块 (QGL*) 已被标记为废弃 新的图形架构&#xff1a; Qt 6 引入了基于 QRhi (Qt Rendering Hardware Interface) 的…...

HarmonyOS:如何在启动框架中初始化HMRouter

应用启动时通常需要执行一系列初始化启动任务&#xff0c;如果将启动任务都放在应用主模块&#xff08;即entry类型的Module&#xff09;的UIAbility组件的onCreate生命周期中&#xff0c;那么只能在主线程中依次执行&#xff0c;不但影响应用的启动速度&#xff0c;而且当启动…...

计算机基础知识(第五篇)

计算机基础知识&#xff08;第五篇&#xff09; 架构演化与维护 软件架构的演化和定义 软件架构的演化和维护就是对架构进行修改和完善的过程&#xff0c;目的就是为了使软件能够适应环境的变化而进行的纠错性修改和完善性修改等&#xff0c;是一个不断迭代的过程&#xff0…...

【飞腾AI加固服务器】全国产化飞腾+昇腾310+PCIe Switch的AI大模型服务器解决方案

以下是全国产化飞腾AI加固服务器采用飞腾昇腾PCIe Switch解决方案&#xff1a; &#x1f5a5;️ 一、硬件架构亮点 ‌国产算力双擎‌ ‌飞腾处理器‌&#xff1a;搭载飞腾FT2000/64核服务器级CPU&#xff08;主频1.8-2.2GHz&#xff09;&#xff0c;支持高并发任务与复杂计算&a…...