当前位置: 首页 > news >正文

rabbitmq载在.net中批量消费的问题记录

背景

最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。

原因是多线程同时处理时导致的内存混乱。

官方文档已经解释的很全面了:https://www.rabbitmq.com/dotnet-api-guide.html

一个简易的单线程消费者

注意如下代码,这只是一个简易的单线程同步的消费者;
每次消费1条消息,消息消费完进行手动ack;

Task.Run(() =>{AutoResetEvent autoResetEvent = new AutoResetEvent(false);ConnectionFactory factory = new ConnectionFactory();// "guest"/"guest" by default, limited to localhost connectionsfactory.UserName = user;factory.Password = pass;factory.VirtualHost = vhost;factory.HostName = hostName;// this name will be shared by all connections instantiated by// this factoryfactory.ClientProvidedName = "app:audit component:event-consumer";IConnection conn = factory.CreateConnection();using (IModel channel = conn .CreateModel()){channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);channel.QueueDeclare(queueName, false, false, false, null);channel.QueueBind(queueName, exchangeName, routingKey, null);consumer.Received += (ch, ea) =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);}ConsoleUtil.WriteLine("mq started");autoResetEvent.WaitOne();ConsoleUtil.WriteLine("mq shutdown");}
});

批量消费

好的,那么我现在想要同时消费5条消息,想要达到并行的效果,需要如何改代码呢?看下面的改动:

改动1:

先看两个概念

  1. prefetchCount(预取计数):

    • prefetchCount 是一个用来限制每个消费者一次性从队列中获取的消息数量的参数。
    • 当你有多个消费者同时连接到同一个队列时,RabbitMQ 可以将消息均匀地分发给这些消费者。
    • 通过设置 prefetchCount,你可以告诉 RabbitMQ 每个消费者一次最多获取多少条消息。
    • 这个参数的目的是确保消息在被消费者处理之前不会全部放到内存中,从而提高系统的稳定性和性能。它有助于避免 一个消费者获取了太多消息而导致其他消费者无法获取任何消息的情况。
  2. concurrentConsumers(并发消费者):

    • concurrentConsumers 是指在同一队列上允许多少个并发消费者。
    • 每个并发消费者都会独立地处理消息,这有助于提高系统的处理能力和吞吐量。
    • 通过增加 concurrentConsumers 数量,你可以增加并发处理消息的能力。
    • 注意,这个参数不同于 prefetchCount,它控制的是同时运行的消费者的数量,而不是单个消费者一次性获取的消息数量。
// 参数1:prefetchSize:可接收消息的大小,如果设置为0,那么表示对消息本身的大小不限制
// 参数2:prefetchCount:处理消息最大的数量。相当于消费者能一次接受的队列大小
// 参数3:global:是不是针对整个 Connection 的,因为一个 Connection 可以有多个 Channel
// global=false:针对的是这个 Channel
// global=ture: 针对的是这个 Connection
channel.BasicQos(0, 5, false);
factory.ConsumerDispatchConcurrency = 5;

好的,这时候我配置了同时处理5条消息,看起来没问题了,但是官网文档有这样一句话:

IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.

This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.

If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:

大概意思就是应该避免多个线程同时使用IModel实例,也就是channel对象,如果这么做的后果就是高负载情况下导致内存混乱,有可能你的线程1消费到了线程5本该消费的消息,这听起来后果是很严重的,那么我们应该怎么改动呢?官网也给方案了,就是给channel对象加锁,看下面的代码改动:

改动2

consumer.Received += (ch, ea) =>{	var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}
};lock (channel){channel.BasicConsume(queue: "my-queue",autoAck: false,consumer: consumer);
}

异步支持

新增一个配置:

factory.DispatchConsumersAsync = true;

然后修改消费者:

var consumer = new AsyncEventingBasicConsumer(channel);consumer.Received += async (model, ea) =>
{await Task.Run(() =>{var body = ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}});
};

相关文章:

rabbitmq载在.net中批量消费的问题记录

背景 最近遇到了一个问题,在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题,使用的是.net框架,背景是高并发压力下的mq消费,按理说即使队列中堆了几百条消息,我客户端可以同处理5个消息。 原因是多线程…...

【RPC 协议】序列化与反序列化 | lua-cjson | lua-protobuf

文章目录 RPC 协议gRPCJSON-RPC 数据序列化与反序列化lua-cjsonlua-protobuf RPC 协议 在分布式计算,远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调…...

Flutter的Timer类

文章目录 一、Timer简介Timer类的详细介绍导入dart:async包创建一个定时器取消定时器定时器的周期性执行注意事项 二、Semantics简介Flutter Semantics 的详细介绍SemanticsNode语义标签和标签形状语义属性自定义语义行为通过语义树导航 一、Timer简介 Flutter的Timer类是Dart…...

Chrome小恐龙快跑小游戏——Python实现

目录 视频演示 代码实现 视频演示 Chrome小恐龙快跑小游戏——Python实现 代码实现 import pygame import os import random pygame.init()# Global Constants SCREEN_HEIGHT 600 SCREEN_WIDTH 1100 game_over False SCREEN pygame.display.set_mode((SCREEN_WIDTH, SCR…...

Web网站服务器

目录 一、什么是Apache? 二、虚拟目录是什么? 三、Apcahe相关配置文件 四、httpd.conf主配置文件的常用配置参数 五、Web网站配置案例 5.1搭建基于用户的个人主页网站 5.2、配置虚拟目录 5.3、配置虚拟主机 5.3.1搭建两个基于IP地址的虚拟主机 5.3.2搭建两个基于域…...

Docker consul 容器服务自动发现和更新

目录 一、什么是服务注册与发现 二、Docker-consul集群 1.Docker-consul consul提供的一些关键特性 2.registrator 3.Consul-template 三、Docker-consul实现过程 以配置nginx负载均衡为例 先配置consul-agent ,有两种模式server和client 四、Docker-cons…...

CentOS 8 执行yum命令报错:Failed to set locale, defaulting to C.UTF-8

今天Docker新搞了一个CentOS镜像,在运行基于该镜像的容器,执行yum命令时,遇到了如下报错: [rootGC Administrator]# yum install -y yum-utils Failed to set locale, defaulting to C.UTF-8 CentOS Linux 8 - AppStream …...

8. 损失函数与反向传播

8.1 损失函数 ① Loss损失函数一方面计算实际输出和目标之间的差距。 ② Loss损失函数另一方面为我们更新输出提供一定的依据。 8.2 L1loss损失函数 ① L1loss数学公式如下图所示,例子如下下图所示。 import torch from torch.nn import L1Loss inputs torch.tens…...

Angular安全专辑之四 —— 避免服务端可能的资源耗尽(NodeJS)

express-rate-limit是一个简单实用的npm包,用于在Express应用程序中实现速率限制。它可以帮助防止DDoS攻击和暴力破解,同时还允许对API端点进行流控。 express-rate-limit及其主要功能 express-rate-limit是Express框架的一个流行中间件,它允许根据IP地址或其他标准轻松地对请求…...

Servlet学习总结(Request请求与转发,Response响应,Servlet生命周期、体系结构、执行流程等...)

Override 是Java中的注解(Annotation),它用于告诉编译器该方法是覆盖(重写)父类中的方法。当我们使用Override注解时,编译器会检查当前方法是否正确地覆盖了父类中的方法,如果没有覆盖成功&…...

雅思写作 三小时浓缩学习顾家北 笔记总结(二)

目录 饥饿网一百句翻译 Using government funds for pollution cleanup work can create a comfortable environment. "Allocating government funds to pollution cleanup work can contribute to the creation of a comfortable environment." Some advertise…...

Element Plus 日期选择器的使用和属性

element plus 日期选择器如果如果没有进行处理 他会返回原有的属性值data格式 如果想要获取选中的日期时间就需要通过以下的代码来实现选中的值 format"YYYY/MM/DD" value-format"YYYY-MM-DD" <el-date-pickerv-model"formInline.date" type&…...

中国五百强企业用泛微为合同加速,提升数字化办公水平

华谊集团借力泛微&#xff0c;融合企业微信、SAP、WPS、电子签章等多种系统&#xff0c;构建了业务集成、场景驱动的全程数字化合同管理平台。 上海华谊&#xff08;集团&#xff09;公司是由上海市政府国有资产监督管理委员会授权&#xff0c;通过资产重组建立的大型化工企业…...

Vue3 QRCode生成

一. 依赖安装 npm install vue-qr --save 二. 引用与使用 引用 <script> // import vueqr from vue-qr vue2的引入 import vueqr from vue-qr/src/packages/vue-qr.vue // vue3的引入 export default {components: {vueqr} } </script> 使用 <template>&…...

2023年8月随笔之有顾忌了

1. 回头看 日更坚持了243天。 读《发布&#xff01;设计与部署稳定的分布式系统》终于更新完成 选读《SQL经典实例》也更新完成 读《高性能MySQL&#xff08;第4版&#xff09;》开更&#xff0c;但目前暂缓 读《SQL学习指南&#xff08;第3版&#xff09;》开更并持续更新…...

正中优配:红筹股是啥意思?

随着我国经济的高速开展&#xff0c;越来越多的人开始参加到股票出资中。其中&#xff0c;红筹股作为一种特别类型的股票&#xff0c;备受一些出资者的关注&#xff0c;但对于一般出资者来说&#xff0c;红筹股详细含义还不是特别清楚。本文将从多个角度探讨红筹股的含义、特征…...

《Linux从练气到飞升》No.19 进程等待

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的…...

OpenCV

文章目录 OpenCV学习报告读取图片和网络摄像头1.1 图片读取1.2 视频读取1.1.1 读取视频文件1.1.2读取网络摄像头 OpenCV基础功能调整、裁剪图像3.1 调整图像大小3.2 裁剪图像 图像上绘制形状和文本4.1 图像上绘制形状4.2图像上写文字 透视变换图像拼接颜色检测轮廓检测人脸检测…...

hadoop解决数据倾斜的方法

分析&回答 1&#xff0c;如果预聚合不影响最终结果&#xff0c;可以使用conbine&#xff0c;提前对数据聚合&#xff0c;减少数据量。使用combinner合并,combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做…...

打造坚不可摧的代码堡垒 - 搭建GitLab私有仓库完全指南

在现代软件开发中&#xff0c;版本控制是一个不可或缺的环节。GitLab是一个流行的版本控制平台&#xff0c;允许开发团队协同工作并管理他们的代码。在某些情况下&#xff0c;您可能希望将您的代码托管在一个私有仓库中&#xff0c;以确保代码的安全性和机密性。在本文中&#…...

Zygo测试驱动开发实践:如何为解释器编写可靠的测试套件

Zygo测试驱动开发实践&#xff1a;如何为解释器编写可靠的测试套件 【免费下载链接】zygomys Zygo is a Lisp interpreter written in 100% Go. Central use case: dynamically compose Go struct trees in a zygo script, then invoke compiled Go functions on those trees. …...

Magma高可用部署:如何构建企业级可靠网络基础设施

Magma高可用部署&#xff1a;如何构建企业级可靠网络基础设施 【免费下载链接】magma Platform for building access networks and modular network services 项目地址: https://gitcode.com/gh_mirrors/mag/magma Magma是构建接入网络和模块化网络服务的强大平台&#…...

DreamTalk多语言支持深度分析:从中文到德语的语音驱动生成

DreamTalk多语言支持深度分析&#xff1a;从中文到德语的语音驱动生成 【免费下载链接】dreamtalk Official implementations for paper: DreamTalk: When Expressive Talking Head Generation Meets Diffusion Probabilistic Models 项目地址: https://gitcode.com/gh_mirro…...

这份榜单够用!盘点2026年断层领先的的AI论文写作软件

一天写完毕业论文在2026年已不再是天方夜谭。以下是2026年最炸裂、实测能大幅提速的AI论文写作软件&#xff0c;覆盖选题构思、文献综述、数据整理、格式排版等核心场景&#xff0c;帮你高效搞定论文。 一、全流程王者&#xff1a;一站式搞定论文全链路&#xff08;一天定稿首选…...

观察使用 Taotoken Token Plan 套餐后的月度成本变化

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 观察使用 Taotoken Token Plan 套餐后的月度成本变化 对于个人开发者或小型团队而言&#xff0c;大模型 API 的调用成本是项目预算…...

别再让治具压坏你的板子!手把手教你用TSK-64应力测试仪搞定ICT/FCT应力管控

从应力失控到精准管控&#xff1a;TSK-64测试仪在ICT/FCT产线的实战指南 当产线突然出现批量PCBA功能异常时&#xff0c;多数工程师的第一反应是检查焊接质量或元器件性能&#xff0c;却往往忽略了治具施加的机械应力这个"隐形杀手"。某汽车电子制造商曾因FCT治具压力…...

为你的AI Agent项目选择并接入Taotoken多模型聚合平台

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 为你的AI Agent项目选择并接入Taotoken多模型聚合平台 当你着手构建一个智能Agent应用时&#xff0c;很快会面临一个现实问题&…...

3步拯救损坏视频!UNTRUNC开源工具让你的珍贵回忆重获新生

3步拯救损坏视频&#xff01;UNTRUNC开源工具让你的珍贵回忆重获新生 【免费下载链接】untrunc Restore a damaged (truncated) mp4, m4v, mov, 3gp video. Provided you have a similar not broken video. 项目地址: https://gitcode.com/gh_mirrors/unt/untrunc 你是否…...

二代壳脱壳新思路:Hook CreateFromRawDexFile捕获原始DEX

1. 为什么“二代壳”让传统脱壳方法集体失效&#xff1f;——从Dex加载链路说起你有没有试过用经典的dumpdex脚本在Android 10设备上跑&#xff0c;结果dump出来的dex文件一打开就是满屏java.lang.ClassNotFoundException&#xff1f;或者用dex2oat反编译出的odex&#xff0c;反…...

紧急更新!Midjourney v6.6对洛可可风格支持突降37%?立即启用这5个兼容性补丁prompt,保住你的商业项目交付期

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Midjourney v6.6洛可可风格兼容性危机全景速览 Midjourney v6.6 发布后&#xff0c;大量用户在生成洛可可&#xff08;Rococo&#xff09;风格图像时遭遇显著退化&#xff1a;繁复卷曲的藤蔓纹样被简化…...