C# 中使用 MassTransit
在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和事件模式,并使用依赖注入。
1. 安装必要的 NuGet 包
首先,确保安装了以下 NuGet 包:
MassTransitMassTransit.RabbitMQMassTransit.AspNetCoreMassTransit.ExtensionsDependencyInjectionMicrosoft.Extensions.Hosting
web端服务代码
<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net6.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using Microsoft.OpenApi.Models;
using MassTransit;
using WebApp_MssTransit.Filters;var builder = WebApplication.CreateBuilder(args);// Add services to the container.
builder.Services.AddMvc(opt =>
{opt.Filters.Add<ExceptionFilter>();
});
builder.Services.AddSwaggerGen(c =>
{c.SwaggerDoc("v1", new OpenApiInfo { Title = "My API", Version = "v1" });
});builder.Services.AddMassTransit(x =>
{// 使用内存//x.UsingInMemory();// 使用RabbitMqx.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", host =>{host.Username("admin");host.Password("admin");});});
});var app = builder.Build();// Configure the HTTP request pipeline.app.UseHttpsRedirection();
app.UseRouting();app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("v1/swagger.json", "My API V1"));app.UseEndpoints(endpoints =>
{endpoints.MapControllers();
});app.Run();
控制器里面
using AppDto;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;namespace WebApp_MssTransit.Controllers;[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{private readonly ILogger<PublishController> _logger;private readonly IBus _bus;public PublishController(ILogger<PublishController> logger, IBus bus){_logger = logger;_bus = bus;}[HttpPost(template : nameof(PublishAsync))]public async Task PublishAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Publish(msg);_logger.LogInformation($"Publish message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(SendAsync))]public async Task SendAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Send(msg);_logger.LogInformation($"Send message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(RequestAirAsync))]public async Task<AirResponse> RequestAirAsync(){var airRequest = new AirRequest{CreationTime = DateTime.Now,Identity = DateTime.Now.Ticks};var esp = await _bus.Request<AirRequest, AirResponse>(airRequest);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Request Response :{JsonSerializer.Serialize(esp)}");var espMsg = esp.Message;_logger.LogInformation($"{espMsg.CreationTime.ToStringTime()} Request Response data");return espMsg;}
}
信息公共类型,这里没有定义接口,直接共享AppDto
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirRequest
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirResponse
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
namespace AppDto;public class OrderMessage
{public Guid Id { get; init; }public string Name { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public static class DateTimeExtant
{public static string ToStringTime(this DateTime time){return time.ToString("yyyy-MM-dd HH:mm:ss.fffff");}
}
接收消息处理端
<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit.RedisIntegration;
using ConsoleApp_Producer;
using MassTransit.Metadata;
using MassTransit.Util;
using System.Reflection;
using MassTransit.Internals;
using AppDto;
using ConsoleApp_consumer;Console.WriteLine("Hello, World!");var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args);builder.ConfigureServices((hostContext, services) =>{services.AddMassTransit(x =>{// 通过扫描程序集注册消费者//x.AddConsumers(typeof(OrderConsumer).Assembly);// 通过类型单个注册消费者x.AddConsumer<OrderConsumer>();x.AddConsumer<AirHandle>();//x.SetKebabCaseEndpointNameFormatter();// 通过泛型单个注册消费者//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();// 通过指定命名空间注册消费者// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();// 使用内存队列// x.UsingInMemory();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostconfig =>{hostconfig.Username("admin");hostconfig.Password("admin");});config.ConfigureEndpoints(context);});});});var host = builder.Build();
host.Run();
using AppDto;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;namespace ConsoleApp_Producer;public class OrderConsumer : IConsumer<OrderMessage>
{private readonly ILogger<OrderConsumer> _logger;public OrderConsumer(ILogger<OrderConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderMessage> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}收到消息:{JsonSerializer.Serialize(context.Message)},\n\t{context.Message.CreationTime.ToStringTime()} 消息时间");return Task.CompletedTask;}
}
using AppDto;
using ConsoleApp_Producer;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ConsoleApp_consumer;public class AirHandle : IConsumer<AirRequest>
{private readonly ILogger<AirHandle> _logger;public AirHandle(ILogger<AirHandle> logger){_logger = logger;}public async Task Consume(ConsumeContext<AirRequest> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}Receive Data :{context.Message.Identity}");_logger.LogInformation($"{context.Message.CreationTime.ToStringTime()} message data");var resp = new AirResponse{Identity = context.Message.Identity,CreationTime = DateTime.Now,};await context.RespondAsync(resp);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Respond Data {resp}");}
}
7. 运行应用程序
运行应用程序后,你可以通过发送 HTTP 请求来发布消息和发送请求。例如,使用 Postman 或 cURL:
- 发布消息:
### 8. 处理错误和重试
在生产环境中,处理错误和重试是很重要的。你可以在 MassTransit 配置中启用重试和死信队列:
x.UseInMemoryOutbox();
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(10)));
x.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)));
x.UseFault tolerantExceptionStrategies();
9. 监控和日志
为了监控和调试,可以配置日志记录和监控工具。MassTransit 支持多种日志记录框架,如 Serilog 或 NLog。
10. 安全性和认证
在生产环境中,确保 RabbitMQ 连接是安全的,使用合适的认证和授权机制。可以配置 TLS 和其他安全选项。
通过以上步骤,你可以在生产环境中配置和使用 MassTransit,实现请求/响应和事件处理模式,并利用依赖注入进行服务管理和生命周期管理。
相关文章:
C# 中使用 MassTransit
在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和…...
网络编程 实现联网 b+Tree
网络编程是客户端和服务器之间通信的基础,也是现代应用开发中不可或缺的技能。在 Unity 中实现网络功能,需要结合计算机网络原理、数据结构与算法,以及网络协议的实际应用。以下是对这一块内容的详细介绍,包括每个涉及到的知识点&…...
zentao ubuntu上安装
#下载ZenTaoPMS-21.2-zbox_amd64.tar.gz(https://www.zentao.net/downloads.html) https://dl.zentao.net/zentao/21.2/ZenTaoPMS-21.2-zbox_amd64.tar.gzcd /opt tar -zxvf ZenTaoPMS-21.2-zbox_amd64.tar.gz#启动 /opt/zbox/zbox start /opt/zbox/zbox…...
Java 网络原理 ①-IO多路复用 || 自定义协议 || XML || JSON
这里是Themberfue 在学习完简单的网络编程后,我们将更加深入网络的学习——HTTP协议、TCP协议、UDP协议、IP协议........... IO多路复用 ✨在上一节基于 TCP 协议 编写应用层代码时,我们通过一个线程处理连接的申请,随后通过多线程或者线程…...
Bash Shell知识合集
1. chmod命令 创建一个bash shell脚本 hello.sh ~script $ touch hello.sh脚本创建完成后并不能直接执行,我们要用chmod命令授予它可执行的权限: ~script $ chmod 755 hello.sh授权后的脚本可以直接执行: ~script $ ./hello.sh2.指定运行…...
从0入门自主空中机器人-1【课程介绍】
关于本课程: 本次课程是一套面向对自主空中机器人感兴趣的学生、爱好者、相关从业人员的免费课程,包含了从硬件组装、机载电脑环境设置、代码部署、实机实验等全套详细流程,带你从0开始,组装属于自己的自主无人机,并让…...
Doris使用注意点
自己学习过程中整理,非官方 dws等最后用于查询的表可以考虑使用row存储加快查询,即用空间换时间duplicate key的选择要考虑最常查询使用适当使用bloomfilter 加速查询适当使用aggregate 模式降低max,avg,min之类的计算并加快查询…...
Mybatis插件better-mybatis-generator的下载与使用
1.下载 找到设置 插件 搜索better-mybatis-generator 下载并且重启IDEA 2.连接数据库 点击测试连接 连接成功如下图 3.使用插件 选择对应的表 右击选择 注意:mysql8.0驱动一定要勾上mysql_8 其他地方不要动 然后实体类 mapper xml就都生成好了 mapper里有默认增删…...
uniapp小程序实现弹幕不重叠
uniapp小程序实现弹幕不重叠 1、在父组件中引入弹幕组件 <template><!-- 弹幕 --><barrage ref"barrage" class"barrage-content" reloadDanmu"reloadDanmu"></barrage> </template> <script>import barr…...
快速排序学习优化
首先,上图。 ‘’’ cpp int partSort(int *a ,int left,int right) {int keyi left; //做左侧基准while(left<right){while(left<right && a[right]>a[keyi]){right--;}while(left<right && a[left]<a[keyi]){left;}swap(a[left…...
微信流量主挑战:三天25用户!功能未完善?(新纪元4)
🎉【小程序上线第三天!突破25用户大关!】🎉 嘿,大家好!今天是我们小程序上线的第三天,我们的用户量已经突破了25个!昨天还是16个,今天一觉醒来竟然有25个!这涨…...
jetson 无显示器配置WIFI
我使用的 jetpack 版本是 6.1,发现自带 NetworkManager 软件包,此软件包包含一个守护程序、一个命令行界面(nmcli)和一个基于 curses 的界面(nmtui)。 可以使用 nmcli 命令配置wifi,nmcli 示例…...
SpringCloudAlibaba实战入门之路由网关Gateway断言(十二)
上一节课中我们初步讲解了网关的基本概念、基本功能,并且带大家实战体验了一下网关的初步效果,这节课我们继续学习关于网关的一些更高级有用功能,比如本篇文章的断言。 一、网关主要组成部分 上图中是核心的流程图,最主要的就是Route、Predicates 和 Filters 作用于特定路…...
【ES6复习笔记】ES6的模块化(18)
模块化的概念 模块化是指将一个复杂的系统分解为多个模块,每个模块完成一个特定的功能,模块之间通过接口进行通信。模块化的目的是提高代码的可读性、可维护性和可重用性。 模块化规范产品, ES6 之前的模块化规范有: CommonJS …...
兰亭妙微:专注医疗 UI 设计,点亮数字化医疗新视界
医疗行业界面解决方案以医患使用者为中心,遵循行业使用习惯和表达方式,优化使用流程、设计简洁、人性化的操作界面,采用插画、三维动画、微动效的创作方法,让用户感受到愉悦易用美观的使用体验。蓝蓝设计与知名企业合作项目有&…...
c# 线程 AutoResetEvent 的Set()函数多次调用
本文部分内容摘自ChatGPT 在 C# 中,AutoResetEvent 是一种用于线程同步的机制,它的行为类似于一个信号量,主要用于在多线程环境中发出信号并控制线程的执行。AutoResetEvent 的主要特点是每当调用 Set() 方法时,信号会被设置&…...
汽车行业的MES系统方案(附案例资料合集)
针对汽车行业的MES系统方案,以下是一些关键点和实施案例: 核心功能: 实时监控:MES系统通过传感器和物联网技术实时监控生产线上的每一个环节,确保信息的及时传递。数据分析:系统对收集的数据进行深度分析&a…...
基于监督学习的神经网络控制算法详细介绍和例程
基于监督学习的神经网络控制算法通常用于对已有数据进行训练,以学习输入与输出之间的映射关系。下面我将详细介绍这种算法的原理和流程,并提供一个简单的例程: 算法原理: 输入:给定一组已知的输入信号和对应的输出控制…...
springMVC-请求响应
springmvc——一 站式web框架,核心是处理http请求响应。 前后端分离:需要序列化,服务端把数据序列化成字符串或者流给前端,前端又把json转成对象,前端的叫反序列化。前端把数据序列化转成字符串给服务器,服…...
数据交易和联邦学习的背景下的安全属性
数据交易和联邦学习的背景下的安全属性 在数据交易和联邦学习的背景下,安全属性对于保护数据隐私、确保系统可靠性和维护交易公平性至关重要。以下将分析文章中涉及的安全属性以及分析这些属性的目的。 涉及的安全属性 双向认证:文章虽未明确提及传统意义上的双向认证机制,…...
保姆级教程:用Docker Compose一键部署Dify AI平台(附国内镜像加速与端口冲突解决)
零门槛部署Dify AI开发平台:Docker Compose全流程指南与避坑手册 在AI应用开发领域,快速搭建一个稳定可靠的开发环境往往是项目成功的第一步。Dify作为一款面向开发者的AI应用开发平台,通过可视化编排和低代码方式大大降低了构建基于大语言模…...
AutoRaise:macOS窗口悬停管理的技术实现与配置指南
AutoRaise:macOS窗口悬停管理的技术实现与配置指南 【免费下载链接】AutoRaise AutoRaise (and focus) a window when hovering over it with the mouse 项目地址: https://gitcode.com/gh_mirrors/au/AutoRaise AutoRaise是一款基于Objective-C开发的macOS窗…...
springboot+vue基于web的宠物商城领养网站的设计与实现
目录同行可拿货,招校园代理 ,本人源头供货商功能模块分析技术实现要点特色功能扩展安全与性能项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作同行可拿货,招校园代理 ,本人源头供货商 功能模块分析 用户模块 注册与登录&#…...
深入解析 snprintf 和 vsnprintf:安全格式化字符串的最佳实践
1. 为什么需要安全的字符串格式化 在C语言开发中,字符串格式化是最基础也最容易出问题的操作之一。我见过太多因为格式化字符串不当导致的缓冲区溢出漏洞,轻则程序崩溃,重则成为安全攻击的入口点。传统的sprintf函数就像个不设防的大门&#…...
从零开始:在Unity中完美实现视频播放功能的完整指南(附常见报错解决方案)
从零开始:在Unity中完美实现视频播放功能的完整指南(附常见报错解决方案) 在游戏开发中,视频播放功能的应用场景越来越广泛——从开场动画、过场剧情到UI背景,视频元素能为玩家带来更丰富的视听体验。Unity作为主流的…...
基于宝塔面板与Docker Compose快速部署Dify最新版实战指南
1. 为什么选择宝塔Docker Compose部署Dify? 最近在帮几个创业团队搭建AI开发环境时,发现很多小伙伴都被复杂的部署流程劝退。传统的手动部署方式需要逐个安装Python、Redis、PostgreSQL等依赖,光是版本兼容问题就能折腾大半天。直到上个月我…...
游戏存档终极备份指南:用Ludusavi保护你的游戏进度
游戏存档终极备份指南:用Ludusavi保护你的游戏进度 【免费下载链接】ludusavi Backup tool for PC game saves 项目地址: https://gitcode.com/gh_mirrors/lu/ludusavi 你是否曾因电脑重装、系统崩溃或误操作而丢失珍贵的游戏存档?数百小时的游戏…...
51单片机学习(五)数码管显示
如有大佬发现我文章里的错误,希望多多指出,或者有缺少的也欢迎告诉我,我会尽快补充上去的,感谢各位的支持,要互三的d我哦!一.数码管数码管显示屏和U4 74HC245U574H138译码器一位数码管引脚定义一个数码管由…...
嵌入式AI新篇章:Qwen3-ASR-0.6B在边缘计算设备上的部署与优化
嵌入式AI新篇章:Qwen3-ASR-0.6B在边缘计算设备上的部署与优化 1. 引言:当语音识别遇见边缘计算 想象一下,你对着一个巴掌大的智能音箱说话,它几乎在你话音落下的瞬间就理解了你的意思,并且完全不需要连接云端。或者&…...
RMBG-2.0在YOLOv8项目中的应用:目标检测与背景去除联合处理
RMBG-2.0在YOLOv8项目中的应用:目标检测与背景去除联合处理 1. 为什么需要把目标检测和背景去除连在一起做 你有没有遇到过这样的场景:电商团队要批量处理上千张商品图,先用YOLOv8框出产品位置,再手动抠图换背景,最后…...
