当前位置: 首页 > news >正文

C# 中使用 MassTransit

在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和事件模式,并使用依赖注入。

1. 安装必要的 NuGet 包

首先,确保安装了以下 NuGet 包:

  • MassTransit
  • MassTransit.RabbitMQ
  • MassTransit.AspNetCore
  • MassTransit.ExtensionsDependencyInjection
  • Microsoft.Extensions.Hosting

web端服务代码

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net6.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using Microsoft.OpenApi.Models;
using MassTransit;
using WebApp_MssTransit.Filters;var builder = WebApplication.CreateBuilder(args);// Add services to the container.
builder.Services.AddMvc(opt =>
{opt.Filters.Add<ExceptionFilter>();
});
builder.Services.AddSwaggerGen(c =>
{c.SwaggerDoc("v1", new OpenApiInfo { Title = "My API", Version = "v1" });
});builder.Services.AddMassTransit(x =>
{// 使用内存//x.UsingInMemory();// 使用RabbitMqx.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", host =>{host.Username("admin");host.Password("admin");});});
});var app = builder.Build();// Configure the HTTP request pipeline.app.UseHttpsRedirection();
app.UseRouting();app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("v1/swagger.json", "My API V1"));app.UseEndpoints(endpoints =>
{endpoints.MapControllers();
});app.Run();

控制器里面

using AppDto;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;namespace WebApp_MssTransit.Controllers;[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{private readonly ILogger<PublishController> _logger;private readonly IBus _bus;public PublishController(ILogger<PublishController> logger, IBus bus){_logger = logger;_bus = bus;}[HttpPost(template : nameof(PublishAsync))]public async Task PublishAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Publish(msg);_logger.LogInformation($"Publish message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(SendAsync))]public async Task SendAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Send(msg);_logger.LogInformation($"Send message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(RequestAirAsync))]public async Task<AirResponse> RequestAirAsync(){var airRequest = new AirRequest{CreationTime = DateTime.Now,Identity = DateTime.Now.Ticks};var esp = await _bus.Request<AirRequest, AirResponse>(airRequest);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Request Response :{JsonSerializer.Serialize(esp)}");var espMsg = esp.Message;_logger.LogInformation($"{espMsg.CreationTime.ToStringTime()} Request Response data");return espMsg;}
}

信息公共类型,这里没有定义接口,直接共享AppDto

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirRequest
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirResponse
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
namespace AppDto;public class OrderMessage
{public Guid Id { get; init; }public string Name { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public static class DateTimeExtant
{public static string ToStringTime(this DateTime time){return time.ToString("yyyy-MM-dd HH:mm:ss.fffff");}
}

接收消息处理端

<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>

using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit.RedisIntegration;
using ConsoleApp_Producer;
using MassTransit.Metadata;
using MassTransit.Util;
using System.Reflection;
using MassTransit.Internals;
using AppDto;
using ConsoleApp_consumer;Console.WriteLine("Hello, World!");var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args);builder.ConfigureServices((hostContext, services) =>{services.AddMassTransit(x =>{// 通过扫描程序集注册消费者//x.AddConsumers(typeof(OrderConsumer).Assembly);// 通过类型单个注册消费者x.AddConsumer<OrderConsumer>();x.AddConsumer<AirHandle>();//x.SetKebabCaseEndpointNameFormatter();// 通过泛型单个注册消费者//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();// 通过指定命名空间注册消费者// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();// 使用内存队列// x.UsingInMemory();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostconfig =>{hostconfig.Username("admin");hostconfig.Password("admin");});config.ConfigureEndpoints(context);});});});var host = builder.Build();
host.Run();
using AppDto;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;namespace ConsoleApp_Producer;public class OrderConsumer : IConsumer<OrderMessage>
{private readonly ILogger<OrderConsumer> _logger;public OrderConsumer(ILogger<OrderConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderMessage> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}收到消息:{JsonSerializer.Serialize(context.Message)},\n\t{context.Message.CreationTime.ToStringTime()} 消息时间");return Task.CompletedTask;}
}
using AppDto;
using ConsoleApp_Producer;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ConsoleApp_consumer;public class AirHandle : IConsumer<AirRequest>
{private readonly ILogger<AirHandle> _logger;public AirHandle(ILogger<AirHandle> logger){_logger = logger;}public async Task Consume(ConsumeContext<AirRequest> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}Receive Data :{context.Message.Identity}");_logger.LogInformation($"{context.Message.CreationTime.ToStringTime()} message data");var resp = new AirResponse{Identity = context.Message.Identity,CreationTime = DateTime.Now,};await context.RespondAsync(resp);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Respond  Data {resp}");}
}

7. 运行应用程序

运行应用程序后,你可以通过发送 HTTP 请求来发布消息和发送请求。例如,使用 Postman 或 cURL:

  • 发布消息:

在这里插入图片描述### 8. 处理错误和重试

在生产环境中,处理错误和重试是很重要的。你可以在 MassTransit 配置中启用重试和死信队列:

x.UseInMemoryOutbox();
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(10)));
x.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)));
x.UseFault tolerantExceptionStrategies();

9. 监控和日志

为了监控和调试,可以配置日志记录和监控工具。MassTransit 支持多种日志记录框架,如 Serilog 或 NLog。

10. 安全性和认证

在生产环境中,确保 RabbitMQ 连接是安全的,使用合适的认证和授权机制。可以配置 TLS 和其他安全选项。

通过以上步骤,你可以在生产环境中配置和使用 MassTransit,实现请求/响应和事件处理模式,并利用依赖注入进行服务管理和生命周期管理。

相关文章:

C# 中使用 MassTransit

在生产环境中使用 MassTransit 时&#xff0c;通常需要进行详细的配置&#xff0c;包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例&#xff0c;展示了如何在 ASP.NET Core 应用程序中配置 MassTransit&#xff0c;包括请求/响应模式和…...

网络编程 实现联网 b+Tree

网络编程是客户端和服务器之间通信的基础&#xff0c;也是现代应用开发中不可或缺的技能。在 Unity 中实现网络功能&#xff0c;需要结合计算机网络原理、数据结构与算法&#xff0c;以及网络协议的实际应用。以下是对这一块内容的详细介绍&#xff0c;包括每个涉及到的知识点&…...

zentao ubuntu上安装

#下载ZenTaoPMS-21.2-zbox_amd64.tar.gz&#xff08;https://www.zentao.net/downloads.html&#xff09; https://dl.zentao.net/zentao/21.2/ZenTaoPMS-21.2-zbox_amd64.tar.gzcd /opt tar -zxvf ZenTaoPMS-21.2-zbox_amd64.tar.gz#启动 /opt/zbox/zbox start /opt/zbox/zbox…...

Java 网络原理 ①-IO多路复用 || 自定义协议 || XML || JSON

这里是Themberfue 在学习完简单的网络编程后&#xff0c;我们将更加深入网络的学习——HTTP协议、TCP协议、UDP协议、IP协议........... IO多路复用 ✨在上一节基于 TCP 协议 编写应用层代码时&#xff0c;我们通过一个线程处理连接的申请&#xff0c;随后通过多线程或者线程…...

Bash Shell知识合集

1. chmod命令 创建一个bash shell脚本 hello.sh ~script $ touch hello.sh脚本创建完成后并不能直接执行&#xff0c;我们要用chmod命令授予它可执行的权限&#xff1a; ~script $ chmod 755 hello.sh授权后的脚本可以直接执行&#xff1a; ~script $ ./hello.sh2.指定运行…...

从0入门自主空中机器人-1【课程介绍】

关于本课程&#xff1a; 本次课程是一套面向对自主空中机器人感兴趣的学生、爱好者、相关从业人员的免费课程&#xff0c;包含了从硬件组装、机载电脑环境设置、代码部署、实机实验等全套详细流程&#xff0c;带你从0开始&#xff0c;组装属于自己的自主无人机&#xff0c;并让…...

Doris使用注意点

自己学习过程中整理&#xff0c;非官方 dws等最后用于查询的表可以考虑使用row存储加快查询&#xff0c;即用空间换时间duplicate key的选择要考虑最常查询使用适当使用bloomfilter 加速查询适当使用aggregate 模式降低max&#xff0c;avg&#xff0c;min之类的计算并加快查询…...

Mybatis插件better-mybatis-generator的下载与使用

1.下载 找到设置 插件 搜索better-mybatis-generator 下载并且重启IDEA 2.连接数据库 点击测试连接 连接成功如下图 3.使用插件 选择对应的表 右击选择 注意&#xff1a;mysql8.0驱动一定要勾上mysql_8 其他地方不要动 然后实体类 mapper xml就都生成好了 mapper里有默认增删…...

uniapp小程序实现弹幕不重叠

uniapp小程序实现弹幕不重叠 1、在父组件中引入弹幕组件 <template><!-- 弹幕 --><barrage ref"barrage" class"barrage-content" reloadDanmu"reloadDanmu"></barrage> </template> <script>import barr…...

快速排序学习优化

首先&#xff0c;上图。 ‘’’ cpp int partSort(int *a ,int left,int right) {int keyi left; //做左侧基准while(left<right){while(left<right && a[right]>a[keyi]){right--;}while(left<right && a[left]<a[keyi]){left;}swap(a[left…...

微信流量主挑战:三天25用户!功能未完善?(新纪元4)

&#x1f389;【小程序上线第三天&#xff01;突破25用户大关&#xff01;】&#x1f389; 嘿&#xff0c;大家好&#xff01;今天是我们小程序上线的第三天&#xff0c;我们的用户量已经突破了25个&#xff01;昨天还是16个&#xff0c;今天一觉醒来竟然有25个&#xff01;这涨…...

jetson 无显示器配置WIFI

我使用的 jetpack 版本是 6.1&#xff0c;发现自带 NetworkManager 软件包&#xff0c;此软件包包含一个守护程序、一个命令行界面&#xff08;nmcli&#xff09;和一个基于 curses 的界面&#xff08;nmtui&#xff09;。 可以使用 nmcli 命令配置wifi&#xff0c;nmcli 示例…...

SpringCloudAlibaba实战入门之路由网关Gateway断言(十二)

上一节课中我们初步讲解了网关的基本概念、基本功能,并且带大家实战体验了一下网关的初步效果,这节课我们继续学习关于网关的一些更高级有用功能,比如本篇文章的断言。 一、网关主要组成部分 上图中是核心的流程图,最主要的就是Route、Predicates 和 Filters 作用于特定路…...

【ES6复习笔记】ES6的模块化(18)

模块化的概念 模块化是指将一个复杂的系统分解为多个模块&#xff0c;每个模块完成一个特定的功能&#xff0c;模块之间通过接口进行通信。模块化的目的是提高代码的可读性、可维护性和可重用性。 模块化规范产品&#xff0c; ES6 之前的模块化规范有&#xff1a; CommonJS …...

兰亭妙微:专注医疗 UI 设计,点亮数字化医疗新视界

医疗行业界面解决方案以医患使用者为中心&#xff0c;遵循行业使用习惯和表达方式&#xff0c;优化使用流程、设计简洁、人性化的操作界面&#xff0c;采用插画、三维动画、微动效的创作方法&#xff0c;让用户感受到愉悦易用美观的使用体验。蓝蓝设计与知名企业合作项目有&…...

c# 线程 AutoResetEvent 的Set()函数多次调用

本文部分内容摘自ChatGPT 在 C# 中&#xff0c;AutoResetEvent 是一种用于线程同步的机制&#xff0c;它的行为类似于一个信号量&#xff0c;主要用于在多线程环境中发出信号并控制线程的执行。AutoResetEvent 的主要特点是每当调用 Set() 方法时&#xff0c;信号会被设置&…...

汽车行业的MES系统方案(附案例资料合集)

针对汽车行业的MES系统方案&#xff0c;以下是一些关键点和实施案例&#xff1a; 核心功能&#xff1a; 实时监控&#xff1a;MES系统通过传感器和物联网技术实时监控生产线上的每一个环节&#xff0c;确保信息的及时传递。数据分析&#xff1a;系统对收集的数据进行深度分析&a…...

基于监督学习的神经网络控制算法详细介绍和例程

基于监督学习的神经网络控制算法通常用于对已有数据进行训练&#xff0c;以学习输入与输出之间的映射关系。下面我将详细介绍这种算法的原理和流程&#xff0c;并提供一个简单的例程&#xff1a; 算法原理&#xff1a; 输入&#xff1a;给定一组已知的输入信号和对应的输出控制…...

springMVC-请求响应

springmvc——一 站式web框架&#xff0c;核心是处理http请求响应。 前后端分离&#xff1a;需要序列化&#xff0c;服务端把数据序列化成字符串或者流给前端&#xff0c;前端又把json转成对象&#xff0c;前端的叫反序列化。前端把数据序列化转成字符串给服务器&#xff0c;服…...

数据交易和联邦学习的背景下的安全属性

数据交易和联邦学习的背景下的安全属性 在数据交易和联邦学习的背景下,安全属性对于保护数据隐私、确保系统可靠性和维护交易公平性至关重要。以下将分析文章中涉及的安全属性以及分析这些属性的目的。 涉及的安全属性 双向认证:文章虽未明确提及传统意义上的双向认证机制,…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

实战设计模式之模板方法模式

概述 模板方法模式定义了一个操作中的算法骨架&#xff0c;并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下&#xff0c;重新定义算法中的某些步骤。简单来说&#xff0c;就是在一个方法中定义了要执行的步骤顺序或算法框架&#xff0c;但允许子类…...

Java多线程实现之Runnable接口深度解析

Java多线程实现之Runnable接口深度解析 一、Runnable接口概述1.1 接口定义1.2 与Thread类的关系1.3 使用Runnable接口的优势 二、Runnable接口的基本实现方式2.1 传统方式实现Runnable接口2.2 使用匿名内部类实现Runnable接口2.3 使用Lambda表达式实现Runnable接口 三、Runnabl…...

嵌入式面试常问问题

以下内容面向嵌入式/系统方向的初学者与面试备考者,全面梳理了以下几大板块,并在每个板块末尾列出常见的面试问答思路,帮助你既能夯实基础,又能应对面试挑战。 一、TCP/IP 协议 1.1 TCP/IP 五层模型概述 链路层(Link Layer) 包括网卡驱动、以太网、Wi‑Fi、PPP 等。负责…...

CMS内容管理系统的设计与实现:多站点模式的实现

在一套内容管理系统中&#xff0c;其实有很多站点&#xff0c;比如企业门户网站&#xff0c;产品手册&#xff0c;知识帮助手册等&#xff0c;因此会需要多个站点&#xff0c;甚至PC、mobile、ipad各有一个站点。 每个站点关联的有站点所在目录及所属的域名。 一、站点表设计…...