rabbitmq载在.net中批量消费的问题记录
背景
最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。
原因是多线程同时处理时导致的内存混乱。
官方文档已经解释的很全面了:https://www.rabbitmq.com/dotnet-api-guide.html
一个简易的单线程消费者
注意如下代码,这只是一个简易的单线程同步的消费者;
每次消费1条消息,消息消费完进行手动ack;
Task.Run(() =>{AutoResetEvent autoResetEvent = new AutoResetEvent(false);ConnectionFactory factory = new ConnectionFactory();// "guest"/"guest" by default, limited to localhost connectionsfactory.UserName = user;factory.Password = pass;factory.VirtualHost = vhost;factory.HostName = hostName;// this name will be shared by all connections instantiated by// this factoryfactory.ClientProvidedName = "app:audit component:event-consumer";IConnection conn = factory.CreateConnection();using (IModel channel = conn .CreateModel()){channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);channel.QueueDeclare(queueName, false, false, false, null);channel.QueueBind(queueName, exchangeName, routingKey, null);consumer.Received += (ch, ea) =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);}ConsoleUtil.WriteLine("mq started");autoResetEvent.WaitOne();ConsoleUtil.WriteLine("mq shutdown");}
});
批量消费
好的,那么我现在想要同时消费5条消息,想要达到并行的效果,需要如何改代码呢?看下面的改动:
改动1:
先看两个概念
-
prefetchCount(预取计数):
- prefetchCount 是一个用来限制每个消费者一次性从队列中获取的消息数量的参数。
- 当你有多个消费者同时连接到同一个队列时,RabbitMQ 可以将消息均匀地分发给这些消费者。
- 通过设置 prefetchCount,你可以告诉 RabbitMQ 每个消费者一次最多获取多少条消息。
- 这个参数的目的是确保消息在被消费者处理之前不会全部放到内存中,从而提高系统的稳定性和性能。它有助于避免 一个消费者获取了太多消息而导致其他消费者无法获取任何消息的情况。
-
concurrentConsumers(并发消费者):
- concurrentConsumers 是指在同一队列上允许多少个并发消费者。
- 每个并发消费者都会独立地处理消息,这有助于提高系统的处理能力和吞吐量。
- 通过增加 concurrentConsumers 数量,你可以增加并发处理消息的能力。
- 注意,这个参数不同于 prefetchCount,它控制的是同时运行的消费者的数量,而不是单个消费者一次性获取的消息数量。
// 参数1:prefetchSize:可接收消息的大小,如果设置为0,那么表示对消息本身的大小不限制
// 参数2:prefetchCount:处理消息最大的数量。相当于消费者能一次接受的队列大小
// 参数3:global:是不是针对整个 Connection 的,因为一个 Connection 可以有多个 Channel
// global=false:针对的是这个 Channel
// global=ture: 针对的是这个 Connection
channel.BasicQos(0, 5, false);
factory.ConsumerDispatchConcurrency = 5;
好的,这时候我配置了同时处理5条消息,看起来没问题了,但是官网文档有这样一句话:
IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.
This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.
If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:
大概意思就是应该避免多个线程同时使用IModel
实例,也就是channel
对象,如果这么做的后果就是高负载情况下导致内存混乱,有可能你的线程1消费到了线程5本该消费的消息,这听起来后果是很严重的,那么我们应该怎么改动呢?官网也给方案了,就是给channel
对象加锁,看下面的代码改动:
改动2
consumer.Received += (ch, ea) =>{ var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}
};lock (channel){channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);
}
异步支持
新增一个配置:
factory.DispatchConsumersAsync = true;
然后修改消费者:
var consumer = new AsyncEventingBasicConsumer(channel);consumer.Received += async (model, ea) =>
{await Task.Run(() =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}});
};
相关文章:

rabbitmq载在.net中批量消费的问题记录
背景 最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。 原因是多线程…...

【RPC 协议】序列化与反序列化 | lua-cjson | lua-protobuf
文章目录 RPC 协议gRPCJSON-RPC 数据序列化与反序列化lua-cjsonlua-protobuf RPC 协议 在分布式计算,远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调…...

Flutter的Timer类
文章目录 一、Timer简介Timer类的详细介绍导入dart:async包创建一个定时器取消定时器定时器的周期性执行注意事项 二、Semantics简介Flutter Semantics 的详细介绍SemanticsNode语义标签和标签形状语义属性自定义语义行为通过语义树导航 一、Timer简介 Flutter的Timer类是Dart…...

Chrome小恐龙快跑小游戏——Python实现
目录 视频演示 代码实现 视频演示 Chrome小恐龙快跑小游戏——Python实现 代码实现 import pygame import os import random pygame.init()# Global Constants SCREEN_HEIGHT 600 SCREEN_WIDTH 1100 game_over False SCREEN pygame.display.set_mode((SCREEN_WIDTH, SCR…...

Web网站服务器
目录 一、什么是Apache? 二、虚拟目录是什么? 三、Apcahe相关配置文件 四、httpd.conf主配置文件的常用配置参数 五、Web网站配置案例 5.1搭建基于用户的个人主页网站 5.2、配置虚拟目录 5.3、配置虚拟主机 5.3.1搭建两个基于IP地址的虚拟主机 5.3.2搭建两个基于域…...

Docker consul 容器服务自动发现和更新
目录 一、什么是服务注册与发现 二、Docker-consul集群 1.Docker-consul consul提供的一些关键特性 2.registrator 3.Consul-template 三、Docker-consul实现过程 以配置nginx负载均衡为例 先配置consul-agent ,有两种模式server和client 四、Docker-cons…...

CentOS 8 执行yum命令报错:Failed to set locale, defaulting to C.UTF-8
今天Docker新搞了一个CentOS镜像,在运行基于该镜像的容器,执行yum命令时,遇到了如下报错: [rootGC Administrator]# yum install -y yum-utils Failed to set locale, defaulting to C.UTF-8 CentOS Linux 8 - AppStream …...

8. 损失函数与反向传播
8.1 损失函数 ① Loss损失函数一方面计算实际输出和目标之间的差距。 ② Loss损失函数另一方面为我们更新输出提供一定的依据。 8.2 L1loss损失函数 ① L1loss数学公式如下图所示,例子如下下图所示。 import torch from torch.nn import L1Loss inputs torch.tens…...

Angular安全专辑之四 —— 避免服务端可能的资源耗尽(NodeJS)
express-rate-limit是一个简单实用的npm包,用于在Express应用程序中实现速率限制。它可以帮助防止DDoS攻击和暴力破解,同时还允许对API端点进行流控。 express-rate-limit及其主要功能 express-rate-limit是Express框架的一个流行中间件,它允许根据IP地址或其他标准轻松地对请求…...

Servlet学习总结(Request请求与转发,Response响应,Servlet生命周期、体系结构、执行流程等...)
Override 是Java中的注解(Annotation),它用于告诉编译器该方法是覆盖(重写)父类中的方法。当我们使用Override注解时,编译器会检查当前方法是否正确地覆盖了父类中的方法,如果没有覆盖成功&…...

雅思写作 三小时浓缩学习顾家北 笔记总结(二)
目录 饥饿网一百句翻译 Using government funds for pollution cleanup work can create a comfortable environment. "Allocating government funds to pollution cleanup work can contribute to the creation of a comfortable environment." Some advertise…...

Element Plus 日期选择器的使用和属性
element plus 日期选择器如果如果没有进行处理 他会返回原有的属性值data格式 如果想要获取选中的日期时间就需要通过以下的代码来实现选中的值 format"YYYY/MM/DD" value-format"YYYY-MM-DD" <el-date-pickerv-model"formInline.date" type&…...

中国五百强企业用泛微为合同加速,提升数字化办公水平
华谊集团借力泛微,融合企业微信、SAP、WPS、电子签章等多种系统,构建了业务集成、场景驱动的全程数字化合同管理平台。 上海华谊(集团)公司是由上海市政府国有资产监督管理委员会授权,通过资产重组建立的大型化工企业…...

Vue3 QRCode生成
一. 依赖安装 npm install vue-qr --save 二. 引用与使用 引用 <script> // import vueqr from vue-qr vue2的引入 import vueqr from vue-qr/src/packages/vue-qr.vue // vue3的引入 export default {components: {vueqr} } </script> 使用 <template>&…...

2023年8月随笔之有顾忌了
1. 回头看 日更坚持了243天。 读《发布!设计与部署稳定的分布式系统》终于更新完成 选读《SQL经典实例》也更新完成 读《高性能MySQL(第4版)》开更,但目前暂缓 读《SQL学习指南(第3版)》开更并持续更新…...

正中优配:红筹股是啥意思?
随着我国经济的高速开展,越来越多的人开始参加到股票出资中。其中,红筹股作为一种特别类型的股票,备受一些出资者的关注,但对于一般出资者来说,红筹股详细含义还不是特别清楚。本文将从多个角度探讨红筹股的含义、特征…...

《Linux从练气到飞升》No.19 进程等待
🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的…...

OpenCV
文章目录 OpenCV学习报告读取图片和网络摄像头1.1 图片读取1.2 视频读取1.1.1 读取视频文件1.1.2读取网络摄像头 OpenCV基础功能调整、裁剪图像3.1 调整图像大小3.2 裁剪图像 图像上绘制形状和文本4.1 图像上绘制形状4.2图像上写文字 透视变换图像拼接颜色检测轮廓检测人脸检测…...

hadoop解决数据倾斜的方法
分析&回答 1,如果预聚合不影响最终结果,可以使用conbine,提前对数据聚合,减少数据量。使用combinner合并,combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做…...

打造坚不可摧的代码堡垒 - 搭建GitLab私有仓库完全指南
在现代软件开发中,版本控制是一个不可或缺的环节。GitLab是一个流行的版本控制平台,允许开发团队协同工作并管理他们的代码。在某些情况下,您可能希望将您的代码托管在一个私有仓库中,以确保代码的安全性和机密性。在本文中&#…...

linux把文件压缩/解压成.tar.gz/tar/tgz等格式的命令大全
linux把文件压缩/解压成.tar.gz/tar/tgz等格式的命令大全 linux压缩命令常用的有:tar,tgz,gzip,zip,rar 一,tar(一) tar压缩命令#说明:#举例: (二…...

用户角色权限demo后续出现问题和解决
将demo账号给到理解和蒋老师,测试的时候将登录人账号改了,结果登录不了了,后续还需要分配权限无法更改他人的账号和密码 将用户和权限重新分配(数据库更改,不要学我) 试着登录还是报一样的错,但…...

SpringBoot在IDEA里实现热部署
使用步骤 1.引入依赖 <!--devtools热部署--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional><scope>true</scope><versi…...

浅谈Linux中的mkdir -p
mkdir 是一个用于创建目录(目录树)的 Unix 和 Linux 命令。-p 选项允许创建一个目录和它不存在的父目录。换句话说,-p 选项确保了指定的整个目录路径都会被创建。 基础用法 如果你只是运行 mkdir new_directory,这个命令会尝试在…...

设计模式—职责链模式(Chain of Responsibility)
目录 思维导图 什么是职责链模式? 有什么优点呢? 有什么缺点呢? 什么场景使用呢? 代码展示 ①、职责链模式 ②、加薪代码重构 思维导图 什么是职责链模式? 使多个对象都有机会处理请求,从而避免请…...

vue小测试之拖拽、自定义事件
在开始之前我去复习了一下,clientX、clientY、pageX、pageY的区别,对于不熟悉offsetLeft和offsetTop的也可以在这里去复习一下。 vue拖拽指令之offsetX、clientX、pageX、screenX_wade3po的博客-CSDN博客_vue offset 客户区坐标位置(clientX&…...

时序预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机时间序列预测(多指标评价)
时序预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机时间序列预测(多指标评价) 目录 时序预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机时间序列预测(多指标评价)效果一览基本描述程序设计参考资料 效果一览 基本描述 MATLAB实现DBN-SVM深度置信网络结合支持向量机…...

Python中异步编程是什么意思? - 易智编译EaseEditing
异步编程是一种编程模式,用于处理可能会导致程序等待的操作,例如网络请求、文件读写或长时间的计算任务,而不会阻塞整个程序的执行。 在传统的同步编程中,当程序执行一个耗时的操作时,它会等待该操作完成,…...

【JS真好玩】自动打字机效果
目录 一、前言二、布局分析三、总体样式四、中间部分五、底部5.1 div5.2 label5.3 input 六、JS让它动起来6.1定时器6.2 字符串处理6.2.1 slice6.2.2 splice6.3.3 split 七、总结 一、前言 大家好,今天实现一个自动打字机效果,旨在实现一些网上很小的de…...

宠物赛道,用AI定制宠物头像搞钱项目教程
今天给大家介绍一个非常有趣,而粉丝价值又极高,用AI去定制宠物头像或合照的AI项目。 接触过宠物行业应该知道,获取1位铲屎官到私域,这类用户的价值是极高的,一个宠物粉,是连铲个屎都要花钱的,每…...