当前位置: 首页 > news >正文

C# 中使用 MassTransit

在生产环境中使用 MassTransit 时,通常需要进行详细的配置,包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例,展示了如何在 ASP.NET Core 应用程序中配置 MassTransit,包括请求/响应模式和事件模式,并使用依赖注入。

1. 安装必要的 NuGet 包

首先,确保安装了以下 NuGet 包:

  • MassTransit
  • MassTransit.RabbitMQ
  • MassTransit.AspNetCore
  • MassTransit.ExtensionsDependencyInjection
  • Microsoft.Extensions.Hosting

web端服务代码

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net6.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>
using Microsoft.OpenApi.Models;
using MassTransit;
using WebApp_MssTransit.Filters;var builder = WebApplication.CreateBuilder(args);// Add services to the container.
builder.Services.AddMvc(opt =>
{opt.Filters.Add<ExceptionFilter>();
});
builder.Services.AddSwaggerGen(c =>
{c.SwaggerDoc("v1", new OpenApiInfo { Title = "My API", Version = "v1" });
});builder.Services.AddMassTransit(x =>
{// 使用内存//x.UsingInMemory();// 使用RabbitMqx.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", host =>{host.Username("admin");host.Password("admin");});});
});var app = builder.Build();// Configure the HTTP request pipeline.app.UseHttpsRedirection();
app.UseRouting();app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("v1/swagger.json", "My API V1"));app.UseEndpoints(endpoints =>
{endpoints.MapControllers();
});app.Run();

控制器里面

using AppDto;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;namespace WebApp_MssTransit.Controllers;[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{private readonly ILogger<PublishController> _logger;private readonly IBus _bus;public PublishController(ILogger<PublishController> logger, IBus bus){_logger = logger;_bus = bus;}[HttpPost(template : nameof(PublishAsync))]public async Task PublishAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Publish(msg);_logger.LogInformation($"Publish message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(SendAsync))]public async Task SendAsync(){var msg = new OrderMessage(){Id = Guid.NewGuid(),Name = "Phone",CreationTime = DateTime.Now};await _bus.Send(msg);_logger.LogInformation($"Send message :{JsonSerializer.Serialize(msg)}");}[HttpPost(template: nameof(RequestAirAsync))]public async Task<AirResponse> RequestAirAsync(){var airRequest = new AirRequest{CreationTime = DateTime.Now,Identity = DateTime.Now.Ticks};var esp = await _bus.Request<AirRequest, AirResponse>(airRequest);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Request Response :{JsonSerializer.Serialize(esp)}");var espMsg = esp.Message;_logger.LogInformation($"{espMsg.CreationTime.ToStringTime()} Request Response data");return espMsg;}
}

信息公共类型,这里没有定义接口,直接共享AppDto

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirRequest
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public class AirResponse
{public long Identity { get; set; }public DateTime CreationTime { get; set; }
}
namespace AppDto;public class OrderMessage
{public Guid Id { get; init; }public string Name { get; set; }public DateTime CreationTime { get; set; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace AppDto;public static class DateTimeExtant
{public static string ToStringTime(this DateTime time){return time.ToString("yyyy-MM-dd HH:mm:ss.fffff");}
}

接收消息处理端

<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.3.1" /><PackageReference Include="MassTransit.RabbitMQ" Version="8.3.4" /><PackageReference Include="MassTransit.Redis" Version="8.3.4" /><PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" /></ItemGroup><ItemGroup><ProjectReference Include="..\AppDto\AppDto.csproj" /></ItemGroup></Project>

using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit.RedisIntegration;
using ConsoleApp_Producer;
using MassTransit.Metadata;
using MassTransit.Util;
using System.Reflection;
using MassTransit.Internals;
using AppDto;
using ConsoleApp_consumer;Console.WriteLine("Hello, World!");var builder = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args);builder.ConfigureServices((hostContext, services) =>{services.AddMassTransit(x =>{// 通过扫描程序集注册消费者//x.AddConsumers(typeof(OrderConsumer).Assembly);// 通过类型单个注册消费者x.AddConsumer<OrderConsumer>();x.AddConsumer<AirHandle>();//x.SetKebabCaseEndpointNameFormatter();// 通过泛型单个注册消费者//x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();// 通过指定命名空间注册消费者// x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();// 使用内存队列// x.UsingInMemory();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostconfig =>{hostconfig.Username("admin");hostconfig.Password("admin");});config.ConfigureEndpoints(context);});});});var host = builder.Build();
host.Run();
using AppDto;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;namespace ConsoleApp_Producer;public class OrderConsumer : IConsumer<OrderMessage>
{private readonly ILogger<OrderConsumer> _logger;public OrderConsumer(ILogger<OrderConsumer> logger){_logger = logger;}public Task Consume(ConsumeContext<OrderMessage> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}收到消息:{JsonSerializer.Serialize(context.Message)},\n\t{context.Message.CreationTime.ToStringTime()} 消息时间");return Task.CompletedTask;}
}
using AppDto;
using ConsoleApp_Producer;
using MassTransit;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace ConsoleApp_consumer;public class AirHandle : IConsumer<AirRequest>
{private readonly ILogger<AirHandle> _logger;public AirHandle(ILogger<AirHandle> logger){_logger = logger;}public async Task Consume(ConsumeContext<AirRequest> context){_logger.LogInformation($"{DateTime.Now.ToStringTime()}Receive Data :{context.Message.Identity}");_logger.LogInformation($"{context.Message.CreationTime.ToStringTime()} message data");var resp = new AirResponse{Identity = context.Message.Identity,CreationTime = DateTime.Now,};await context.RespondAsync(resp);_logger.LogInformation($"{DateTime.Now.ToStringTime()} Respond  Data {resp}");}
}

7. 运行应用程序

运行应用程序后,你可以通过发送 HTTP 请求来发布消息和发送请求。例如,使用 Postman 或 cURL:

  • 发布消息:

在这里插入图片描述### 8. 处理错误和重试

在生产环境中,处理错误和重试是很重要的。你可以在 MassTransit 配置中启用重试和死信队列:

x.UseInMemoryOutbox();
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(10)));
x.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)));
x.UseFault tolerantExceptionStrategies();

9. 监控和日志

为了监控和调试,可以配置日志记录和监控工具。MassTransit 支持多种日志记录框架,如 Serilog 或 NLog。

10. 安全性和认证

在生产环境中,确保 RabbitMQ 连接是安全的,使用合适的认证和授权机制。可以配置 TLS 和其他安全选项。

通过以上步骤,你可以在生产环境中配置和使用 MassTransit,实现请求/响应和事件处理模式,并利用依赖注入进行服务管理和生命周期管理。

相关文章:

C# 中使用 MassTransit

在生产环境中使用 MassTransit 时&#xff0c;通常需要进行详细的配置&#xff0c;包括设置连接字符串、配置队列、配置消费者、处理重试和错误队列等。以下是一个完整的示例&#xff0c;展示了如何在 ASP.NET Core 应用程序中配置 MassTransit&#xff0c;包括请求/响应模式和…...

网络编程 实现联网 b+Tree

网络编程是客户端和服务器之间通信的基础&#xff0c;也是现代应用开发中不可或缺的技能。在 Unity 中实现网络功能&#xff0c;需要结合计算机网络原理、数据结构与算法&#xff0c;以及网络协议的实际应用。以下是对这一块内容的详细介绍&#xff0c;包括每个涉及到的知识点&…...

zentao ubuntu上安装

#下载ZenTaoPMS-21.2-zbox_amd64.tar.gz&#xff08;https://www.zentao.net/downloads.html&#xff09; https://dl.zentao.net/zentao/21.2/ZenTaoPMS-21.2-zbox_amd64.tar.gzcd /opt tar -zxvf ZenTaoPMS-21.2-zbox_amd64.tar.gz#启动 /opt/zbox/zbox start /opt/zbox/zbox…...

Java 网络原理 ①-IO多路复用 || 自定义协议 || XML || JSON

这里是Themberfue 在学习完简单的网络编程后&#xff0c;我们将更加深入网络的学习——HTTP协议、TCP协议、UDP协议、IP协议........... IO多路复用 ✨在上一节基于 TCP 协议 编写应用层代码时&#xff0c;我们通过一个线程处理连接的申请&#xff0c;随后通过多线程或者线程…...

Bash Shell知识合集

1. chmod命令 创建一个bash shell脚本 hello.sh ~script $ touch hello.sh脚本创建完成后并不能直接执行&#xff0c;我们要用chmod命令授予它可执行的权限&#xff1a; ~script $ chmod 755 hello.sh授权后的脚本可以直接执行&#xff1a; ~script $ ./hello.sh2.指定运行…...

从0入门自主空中机器人-1【课程介绍】

关于本课程&#xff1a; 本次课程是一套面向对自主空中机器人感兴趣的学生、爱好者、相关从业人员的免费课程&#xff0c;包含了从硬件组装、机载电脑环境设置、代码部署、实机实验等全套详细流程&#xff0c;带你从0开始&#xff0c;组装属于自己的自主无人机&#xff0c;并让…...

Doris使用注意点

自己学习过程中整理&#xff0c;非官方 dws等最后用于查询的表可以考虑使用row存储加快查询&#xff0c;即用空间换时间duplicate key的选择要考虑最常查询使用适当使用bloomfilter 加速查询适当使用aggregate 模式降低max&#xff0c;avg&#xff0c;min之类的计算并加快查询…...

Mybatis插件better-mybatis-generator的下载与使用

1.下载 找到设置 插件 搜索better-mybatis-generator 下载并且重启IDEA 2.连接数据库 点击测试连接 连接成功如下图 3.使用插件 选择对应的表 右击选择 注意&#xff1a;mysql8.0驱动一定要勾上mysql_8 其他地方不要动 然后实体类 mapper xml就都生成好了 mapper里有默认增删…...

uniapp小程序实现弹幕不重叠

uniapp小程序实现弹幕不重叠 1、在父组件中引入弹幕组件 <template><!-- 弹幕 --><barrage ref"barrage" class"barrage-content" reloadDanmu"reloadDanmu"></barrage> </template> <script>import barr…...

快速排序学习优化

首先&#xff0c;上图。 ‘’’ cpp int partSort(int *a ,int left,int right) {int keyi left; //做左侧基准while(left<right){while(left<right && a[right]>a[keyi]){right--;}while(left<right && a[left]<a[keyi]){left;}swap(a[left…...

微信流量主挑战:三天25用户!功能未完善?(新纪元4)

&#x1f389;【小程序上线第三天&#xff01;突破25用户大关&#xff01;】&#x1f389; 嘿&#xff0c;大家好&#xff01;今天是我们小程序上线的第三天&#xff0c;我们的用户量已经突破了25个&#xff01;昨天还是16个&#xff0c;今天一觉醒来竟然有25个&#xff01;这涨…...

jetson 无显示器配置WIFI

我使用的 jetpack 版本是 6.1&#xff0c;发现自带 NetworkManager 软件包&#xff0c;此软件包包含一个守护程序、一个命令行界面&#xff08;nmcli&#xff09;和一个基于 curses 的界面&#xff08;nmtui&#xff09;。 可以使用 nmcli 命令配置wifi&#xff0c;nmcli 示例…...

SpringCloudAlibaba实战入门之路由网关Gateway断言(十二)

上一节课中我们初步讲解了网关的基本概念、基本功能,并且带大家实战体验了一下网关的初步效果,这节课我们继续学习关于网关的一些更高级有用功能,比如本篇文章的断言。 一、网关主要组成部分 上图中是核心的流程图,最主要的就是Route、Predicates 和 Filters 作用于特定路…...

【ES6复习笔记】ES6的模块化(18)

模块化的概念 模块化是指将一个复杂的系统分解为多个模块&#xff0c;每个模块完成一个特定的功能&#xff0c;模块之间通过接口进行通信。模块化的目的是提高代码的可读性、可维护性和可重用性。 模块化规范产品&#xff0c; ES6 之前的模块化规范有&#xff1a; CommonJS …...

兰亭妙微:专注医疗 UI 设计,点亮数字化医疗新视界

医疗行业界面解决方案以医患使用者为中心&#xff0c;遵循行业使用习惯和表达方式&#xff0c;优化使用流程、设计简洁、人性化的操作界面&#xff0c;采用插画、三维动画、微动效的创作方法&#xff0c;让用户感受到愉悦易用美观的使用体验。蓝蓝设计与知名企业合作项目有&…...

c# 线程 AutoResetEvent 的Set()函数多次调用

本文部分内容摘自ChatGPT 在 C# 中&#xff0c;AutoResetEvent 是一种用于线程同步的机制&#xff0c;它的行为类似于一个信号量&#xff0c;主要用于在多线程环境中发出信号并控制线程的执行。AutoResetEvent 的主要特点是每当调用 Set() 方法时&#xff0c;信号会被设置&…...

汽车行业的MES系统方案(附案例资料合集)

针对汽车行业的MES系统方案&#xff0c;以下是一些关键点和实施案例&#xff1a; 核心功能&#xff1a; 实时监控&#xff1a;MES系统通过传感器和物联网技术实时监控生产线上的每一个环节&#xff0c;确保信息的及时传递。数据分析&#xff1a;系统对收集的数据进行深度分析&a…...

基于监督学习的神经网络控制算法详细介绍和例程

基于监督学习的神经网络控制算法通常用于对已有数据进行训练&#xff0c;以学习输入与输出之间的映射关系。下面我将详细介绍这种算法的原理和流程&#xff0c;并提供一个简单的例程&#xff1a; 算法原理&#xff1a; 输入&#xff1a;给定一组已知的输入信号和对应的输出控制…...

springMVC-请求响应

springmvc——一 站式web框架&#xff0c;核心是处理http请求响应。 前后端分离&#xff1a;需要序列化&#xff0c;服务端把数据序列化成字符串或者流给前端&#xff0c;前端又把json转成对象&#xff0c;前端的叫反序列化。前端把数据序列化转成字符串给服务器&#xff0c;服…...

数据交易和联邦学习的背景下的安全属性

数据交易和联邦学习的背景下的安全属性 在数据交易和联邦学习的背景下,安全属性对于保护数据隐私、确保系统可靠性和维护交易公平性至关重要。以下将分析文章中涉及的安全属性以及分析这些属性的目的。 涉及的安全属性 双向认证:文章虽未明确提及传统意义上的双向认证机制,…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?

Redis 的发布订阅&#xff08;Pub/Sub&#xff09;模式与专业的 MQ&#xff08;Message Queue&#xff09;如 Kafka、RabbitMQ 进行比较&#xff0c;核心的权衡点在于&#xff1a;简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...