C# 中使用 MassTransit
在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和事件模式,并使用依赖注入。
1. 安装必要的 NuGet 包
首先,确保安装了以下 NuGet 包:
MassTransit
MassTransit.RabbitMQ
MassTransit.AspNetCore
MassTransit.ExtensionsDependencyInjection
Microsoft.Extensions.Hosting
web端服务代码
<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net6.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using Microsoft.OpenApi.Models;
using MassTransit;
using WebApp_MssTransit.Filters;var builder = WebApplication.CreateBuilder(args);// Add services to the container.
builder.Services.AddMvc(opt =>
{opt.Filters.Add<ExceptionFilter>();
});
builder.Services.AddSwaggerGen(c =>
{c.SwaggerDoc("v1", new OpenApiInfo { Title = "My API", Version = "v1" });
});builder.Services.AddMassTransit(x =>
{// 使用内存//x.UsingInMemory();// 使用RabbitMqx.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", host =>{host.Username("admin");host.Password("admin");});});
});var app = builder.Build();// Configure the HTTP request pipeline.app.UseHttpsRedirection();
app.UseRouting();app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("v1/swagger.json", "My API V1"));app.UseEndpoints(endpoints =>
{endpoints.MapControllers();
});app.Run();
控制器里面
using AppDto;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;namespace WebApp_MssTransit.Controllers;[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{private readonly ILogger<PublishController> _logger;private readonly IBus _bus;public PublishController(ILogger<PublishController> logger, IBus bus){_logger = logger;_bus = bus;}[HttpPost(template : nameof(PublishAsync))]public async Task PublishAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Publish(msg);_logger.LogInformation($"Publish message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(SendAsync))]public async Task SendAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Send(msg);_logger.LogInformation($"Send message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(RequestAirAsync))]public async Task<AirResponse> RequestAirAsync(){var airRequest = new AirRequest{CreationTime = DateTime.Now,Identity = DateTime.Now.Ticks};var esp = await _bus.Request<AirRequest, AirResponse>(airRequest);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Request Response :{JsonSerializer.Serialize(esp)}");var espMsg = esp.Message;_logger.LogInformation($"{espMsg.CreationTime.ToStringTime()} Request Response data");return espMsg;}
}
信息公共类型,这里没有定义接口,直接共享AppDto
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirRequest
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirResponse
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
namespace AppDto;public class OrderMessage
{public Guid Id { get; init; }public string Name { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public static class DateTimeExtant
{public static string ToStringTime(this DateTime time){return time.ToString("yyyy-MM-dd HH:mm:ss.fffff");}
}
接收消息处理端
<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit.RedisIntegration;
using ConsoleApp_Producer;
using MassTransit.Metadata;
using MassTransit.Util;
using System.Reflection;
using MassTransit.Internals;
using AppDto;
using ConsoleApp_consumer;Console.WriteLine("Hello, World!");var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args);builder.ConfigureServices((hostContext, services) =>{services.AddMassTransit(x =>{// 通过扫描程序集注册消费者//x.AddConsumers(typeof(OrderConsumer).Assembly);// 通过类型单个注册消费者x.AddConsumer<OrderConsumer>();x.AddConsumer<AirHandle>();//x.SetKebabCaseEndpointNameFormatter();// 通过泛型单个注册消费者//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();// 通过指定命名空间注册消费者// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();// 使用内存队列// x.UsingInMemory();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostconfig =>{hostconfig.Username("admin");hostconfig.Password("admin");});config.ConfigureEndpoints(context);});});});var host = builder.Build();
host.Run();
using AppDto;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;namespace ConsoleApp_Producer;public class OrderConsumer : IConsumer<OrderMessage>
{private readonly ILogger<OrderConsumer> _logger;public OrderConsumer(ILogger<OrderConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderMessage> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}收到消息:{JsonSerializer.Serialize(context.Message)},\n\t{context.Message.CreationTime.ToStringTime()} 消息时间");return Task.CompletedTask;}
}
using AppDto;
using ConsoleApp_Producer;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ConsoleApp_consumer;public class AirHandle : IConsumer<AirRequest>
{private readonly ILogger<AirHandle> _logger;public AirHandle(ILogger<AirHandle> logger){_logger = logger;}public async Task Consume(ConsumeContext<AirRequest> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}Receive Data :{context.Message.Identity}");_logger.LogInformation($"{context.Message.CreationTime.ToStringTime()} message data");var resp = new AirResponse{Identity = context.Message.Identity,CreationTime = DateTime.Now,};await context.RespondAsync(resp);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Respond Data {resp}");}
}
7. 运行应用程序
运行应用程序后,你可以通过发送 HTTP 请求来发布消息和发送请求。例如,使用 Postman 或 cURL:
- 发布消息:
### 8. 处理错误和重试
在生产环境中,处理错误和重试是很重要的。你可以在 MassTransit 配置中启用重试和死信队列:
x.UseInMemoryOutbox();
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(10)));
x.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)));
x.UseFault tolerantExceptionStrategies();
9. 监控和日志
为了监控和调试,可以配置日志记录和监控工具。MassTransit 支持多种日志记录框架,如 Serilog 或 NLog。
10. 安全性和认证
在生产环境中,确保 RabbitMQ 连接是安全的,使用合适的认证和授权机制。可以配置 TLS 和其他安全选项。
通过以上步骤,你可以在生产环境中配置和使用 MassTransit,实现请求/响应和事件处理模式,并利用依赖注入进行服务管理和生命周期管理。
相关文章:

C# 中使用 MassTransit
在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和…...
网络编程 实现联网 b+Tree
网络编程是客户端和服务器之间通信的基础,也是现代应用开发中不可或缺的技能。在 Unity 中实现网络功能,需要结合计算机网络原理、数据结构与算法,以及网络协议的实际应用。以下是对这一块内容的详细介绍,包括每个涉及到的知识点&…...

zentao ubuntu上安装
#下载ZenTaoPMS-21.2-zbox_amd64.tar.gz(https://www.zentao.net/downloads.html) https://dl.zentao.net/zentao/21.2/ZenTaoPMS-21.2-zbox_amd64.tar.gzcd /opt tar -zxvf ZenTaoPMS-21.2-zbox_amd64.tar.gz#启动 /opt/zbox/zbox start /opt/zbox/zbox…...

Java 网络原理 ①-IO多路复用 || 自定义协议 || XML || JSON
这里是Themberfue 在学习完简单的网络编程后,我们将更加深入网络的学习——HTTP协议、TCP协议、UDP协议、IP协议........... IO多路复用 ✨在上一节基于 TCP 协议 编写应用层代码时,我们通过一个线程处理连接的申请,随后通过多线程或者线程…...
Bash Shell知识合集
1. chmod命令 创建一个bash shell脚本 hello.sh ~script $ touch hello.sh脚本创建完成后并不能直接执行,我们要用chmod命令授予它可执行的权限: ~script $ chmod 755 hello.sh授权后的脚本可以直接执行: ~script $ ./hello.sh2.指定运行…...
从0入门自主空中机器人-1【课程介绍】
关于本课程: 本次课程是一套面向对自主空中机器人感兴趣的学生、爱好者、相关从业人员的免费课程,包含了从硬件组装、机载电脑环境设置、代码部署、实机实验等全套详细流程,带你从0开始,组装属于自己的自主无人机,并让…...
Doris使用注意点
自己学习过程中整理,非官方 dws等最后用于查询的表可以考虑使用row存储加快查询,即用空间换时间duplicate key的选择要考虑最常查询使用适当使用bloomfilter 加速查询适当使用aggregate 模式降低max,avg,min之类的计算并加快查询…...

Mybatis插件better-mybatis-generator的下载与使用
1.下载 找到设置 插件 搜索better-mybatis-generator 下载并且重启IDEA 2.连接数据库 点击测试连接 连接成功如下图 3.使用插件 选择对应的表 右击选择 注意:mysql8.0驱动一定要勾上mysql_8 其他地方不要动 然后实体类 mapper xml就都生成好了 mapper里有默认增删…...
uniapp小程序实现弹幕不重叠
uniapp小程序实现弹幕不重叠 1、在父组件中引入弹幕组件 <template><!-- 弹幕 --><barrage ref"barrage" class"barrage-content" reloadDanmu"reloadDanmu"></barrage> </template> <script>import barr…...

快速排序学习优化
首先,上图。 ‘’’ cpp int partSort(int *a ,int left,int right) {int keyi left; //做左侧基准while(left<right){while(left<right && a[right]>a[keyi]){right--;}while(left<right && a[left]<a[keyi]){left;}swap(a[left…...

微信流量主挑战:三天25用户!功能未完善?(新纪元4)
🎉【小程序上线第三天!突破25用户大关!】🎉 嘿,大家好!今天是我们小程序上线的第三天,我们的用户量已经突破了25个!昨天还是16个,今天一觉醒来竟然有25个!这涨…...
jetson 无显示器配置WIFI
我使用的 jetpack 版本是 6.1,发现自带 NetworkManager 软件包,此软件包包含一个守护程序、一个命令行界面(nmcli)和一个基于 curses 的界面(nmtui)。 可以使用 nmcli 命令配置wifi,nmcli 示例…...

SpringCloudAlibaba实战入门之路由网关Gateway断言(十二)
上一节课中我们初步讲解了网关的基本概念、基本功能,并且带大家实战体验了一下网关的初步效果,这节课我们继续学习关于网关的一些更高级有用功能,比如本篇文章的断言。 一、网关主要组成部分 上图中是核心的流程图,最主要的就是Route、Predicates 和 Filters 作用于特定路…...
【ES6复习笔记】ES6的模块化(18)
模块化的概念 模块化是指将一个复杂的系统分解为多个模块,每个模块完成一个特定的功能,模块之间通过接口进行通信。模块化的目的是提高代码的可读性、可维护性和可重用性。 模块化规范产品, ES6 之前的模块化规范有: CommonJS …...

兰亭妙微:专注医疗 UI 设计,点亮数字化医疗新视界
医疗行业界面解决方案以医患使用者为中心,遵循行业使用习惯和表达方式,优化使用流程、设计简洁、人性化的操作界面,采用插画、三维动画、微动效的创作方法,让用户感受到愉悦易用美观的使用体验。蓝蓝设计与知名企业合作项目有&…...
c# 线程 AutoResetEvent 的Set()函数多次调用
本文部分内容摘自ChatGPT 在 C# 中,AutoResetEvent 是一种用于线程同步的机制,它的行为类似于一个信号量,主要用于在多线程环境中发出信号并控制线程的执行。AutoResetEvent 的主要特点是每当调用 Set() 方法时,信号会被设置&…...
汽车行业的MES系统方案(附案例资料合集)
针对汽车行业的MES系统方案,以下是一些关键点和实施案例: 核心功能: 实时监控:MES系统通过传感器和物联网技术实时监控生产线上的每一个环节,确保信息的及时传递。数据分析:系统对收集的数据进行深度分析&a…...
基于监督学习的神经网络控制算法详细介绍和例程
基于监督学习的神经网络控制算法通常用于对已有数据进行训练,以学习输入与输出之间的映射关系。下面我将详细介绍这种算法的原理和流程,并提供一个简单的例程: 算法原理: 输入:给定一组已知的输入信号和对应的输出控制…...
springMVC-请求响应
springmvc——一 站式web框架,核心是处理http请求响应。 前后端分离:需要序列化,服务端把数据序列化成字符串或者流给前端,前端又把json转成对象,前端的叫反序列化。前端把数据序列化转成字符串给服务器,服…...
数据交易和联邦学习的背景下的安全属性
数据交易和联邦学习的背景下的安全属性 在数据交易和联邦学习的背景下,安全属性对于保护数据隐私、确保系统可靠性和维护交易公平性至关重要。以下将分析文章中涉及的安全属性以及分析这些属性的目的。 涉及的安全属性 双向认证:文章虽未明确提及传统意义上的双向认证机制,…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...

IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
用鸿蒙HarmonyOS5实现中国象棋小游戏的过程
下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...