【RabbitMQ】消息分发、事务
消息分发
概念
RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。
在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。
比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。
当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。
应用场景
- 限流
- 非公平分发(负载均衡)
限流
在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。
限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBind")public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}}
@RestController
@RequestMapping("/qos")
public class QosController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void qosQueue() {for (int i = 0; i < 10; i++) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);System.out.println("第" + i + "次发送消息成功!");}}}
@Configuration
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println("接收的消息为:" + msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。
负载均衡
在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。
实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。
事务
RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。
@Component
public class RabbitTemplateConfig {@Bean("transactionRabbitTemplate")public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
@Configuration
public class TransactionConfig {@Bean("transactionQueue")public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}@Bean("transactionExchange")public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}@Bean("transactionQueueBind")public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,@Qualifier("transactionExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();}}
@RestController
@RequestMapping("/transaction")
public class TransactionController {@Resource(name = "transactionRabbitTemplate")private RabbitTemplate rabbitTemplate;@Transactional@RequestMappingpublic void transactionQueue() {System.out.println("发送成功");this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");int i = 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");}}
RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。
相关文章:

【RabbitMQ】消息分发、事务
消息分发 概念 RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。 默…...
mysql mha高可用集群搭建
文章目录 mha集群搭建主从从部署基本环境准备安装mysql主从配置 mha部署故障修复 搭建完成 mha集群搭建 在 MySQL 高可用架构中,MHA(Master High Availability)通常采用一主多从的架构。 MHA 可以提供主从复制架构的自动 master failover 功…...

如何解决“json schema validation error ”错误? -- HarmonyOS自学6
一. 问题描述 DevEco Studio工程关闭后,再重新打开时,出现了如下错误提示: json schema validation error 原因: index.visual或其他visual文件中的left等字段的值为负数时,不能以”-0.x“开头,否则就会…...
基于Jeecg-boot开发系统--后端篇
背景 Jeecg-boot是一个后台管理系统,其提供能很多基础的功能,我希望在不修改jeecg-boot代码的前提下增加自己的功能。经过几天的折腾终于搞定了。 首先是基于jeecg-boot微服务的方式来扩展的,jeecg-boot微服务本身的搭建过程就不讲了&#x…...
Spring Boot实战:使用@Import进行业务模块自动化装配
案例背景: 假设我们正在开发一个电子商务平台,该平台需要处理大量的订单数据。为了简化订单处理服务的配置,我们可以利用Import注解来自动注册一些常用的工具类和服务组件。 业务场景描述: 我们需要一个服务来处理订单的创建、…...

Golang | Leetcode Golang题解之第415题字符串相加
题目: 题解: func addStrings(num1 string, num2 string) string {add : 0ans : ""for i, j : len(num1) - 1, len(num2) - 1; i > 0 || j > 0 || add ! 0; i, j i - 1, j - 1 {var x, y intif i > 0 {x int(num1[i] - 0)}if j &g…...

5. 数字证书与公钥基础设施
5. 数字证书与公钥基础设施 (1) PKI 的定义、组成及应用 PKI(Public Key Infrastructure,公钥基础设施) 是一个使用公钥技术来提供安全服务的框架。它定义了如何管理和维护公钥,以及如何通过证书来验证公钥的真实性。PKI的核心组成部分包括: 证书颁发机构(CA, Certifica…...

Centos中关闭swap分区,关闭内存交换
概述: Swap 分区是 Linux 系统中扩展物理内存的一种机制。Swap的主要功能是当全部的RAM被占用并需要更多内存时,用磁盘空间代理RAM内存。Swap对虚拟化技术资源损耗非常大,一般虚拟化是不允许开启交换空间的,如果不关闭Swap&…...

leetcode练习 二叉树的最大深度
给定一个二叉树 root ,返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1: 输入:root [3,9,20,null,null,15,7] 输出:3提示: 树中节点的数量在 [0, 104] 区间内。-100 …...
Scrapy爬虫框架 Items 数据项
在数据抓取和网络爬虫的开发中,Scrapy 框架以其强大的功能和灵活性,成为了开发者首选的工具之一。在一个典型的 Scrapy 项目中,数据项(Items)的定义、Spider 的应用,以及如何使用 ItemLoader 来加载和处理数据,都是开发过程中至关重要的环节。 本教程将分为几个主要部分…...

weblogic CVE-2018-2894 靶场攻略
漏洞描述 Weblogic Web Service Test Page中⼀处任意⽂件上传漏洞,Web Service Test Page 在 "⽣产模式"下默认不开启,所以该漏洞有⼀定限制。 漏洞版本 weblogic 10.3.6.0 weblogic 12.1.3.0 weblogic 12.2.1.2 28 weblogic 12.2.1.3 …...
百易云资产管理运营系统 ticket.edit.php SQL注入漏洞复现
0x01 产品简介 百易云资产管理运营系统,是专门针对企业不动产资产管理和运营需求而设计的一套综合解决方案。该系统能够覆盖资产的全生命周期管理,包括资产的登记、盘点、评估、处置等多个环节,同时提供强大的运营分析功能,帮助企业优化资产配置,提升运营效率。 0x02 漏…...
C++(2)进阶语法
C(2)之进阶语法 Author: Once Day Date: 2024年9月20日 一位热衷于Linux学习和开发的菜鸟,试图谱写一场冒险之旅,也许终点只是一场白日梦… 漫漫长路,有人对你微笑过嘛… 全系列文章可参考专栏: 源码分析_Once-Day的博客-CSDN博客 参考文…...

解决Hive乱码问题
在插入数据后,发现hive乱码 原因:Hive默认将存储表结构的元数据列编码设置为latin1,不支持中文 解决方法:在MySQL中修改对应Hive元数据列的编码 先查看mysql的所有字符集编码 1、先修改my.cnf 代码如下: vim /etc/…...

Streamlit:使用 Python 快速开发 Web 应用
一、简单介绍 Streamlit 是一个开源 Python 库,官网地址: https://streamlit.io/http://StreamlitStreamlit 是一个开源的 Python 框架,旨在为数据科学家和 后端工程师们提供只需几行代码即可创建动态数据应用的功能。 让没有任何前端基础…...

C#基础(11)函数重载
前言 前面我们已经完成了ref和out补充知识点的学习,以及函数参数相关的学习,今天便再次为函数补充一个知识点:函数重载。 函数重载是指在同一个作用域中,可以有多个同名函数,但参数列表不同。它的发展可以追溯到早期…...
堆栈指针寄存器SP的初值是多少?执行PUSH AX命令后,SP的值是多少?执行POP BX后,SP的值是多少?为什么答案给的是200,202,200。
欢迎来到我的技术博客! 🎉 这里不仅有满满的编程干货和学习资源,我的某站账号也为你准备了更多实用的技术视频和知识分享。 👉 点击关注我的小破站账号,获取更多编程技巧和学习资源! 小破站主页 例题 ST…...
python爬虫初体验(二)
在Python中,每个模块都有一个内置的变量 name,用于表示当前模块的名称。当一个Python文件被执行时,Python解释器会首先将该文件作为一个模块导入,并执行其中的代码。此时,__name__的值为模块的名称。 作用 模块可被导…...

细说渗透测试:阶段、流程、工具和自动化开源方案
不知有多少“曾梦想仗剑走天涯”的网络与信息安全从业者,是因为渗透测试的初心而步入这个行业的。不过,您是否对渗透测试及其漏洞扫描的相关概念感到既熟悉又陌生呢?您是否觉得自己还停留在从工作实践中积累的感性认识呢?下面&…...
redis 十大应用场景
Redis 是一个开源的内存数据结构存储系统,广泛应用于各种场景,尤其在高性能、低延迟需求的场景中。以下是 Redis 的一些典型应用场景: 1. 缓存系统 Redis 最常见的用途之一是作为缓存系统,以减少数据库访问的频率,提升…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...

企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...

使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...