.Net Core微服务入门全纪录(六)——EventBus-事件总线
系列文章目录
1、.Net Core微服务入门系列(一)——项目搭建
2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上)
3、.Net Core微服务入门全纪录(三)——Consul-服务注册与发现(下)
4、.Net Core微服务入门全纪录(四)——Ocelot-API网关(上)
5、.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)
6、.Net Core微服务入门全纪录(六)——EventBus-事件总线
7、.Net Core微服务入门全纪录(七)——IdentityServer4-授权认证
8、.Net Core微服务入门全纪录(八)——Docker Compose与容器网络

文章目录
- 系列文章目录
- 前言📃
- 一、EventBus-事件总线
- 1.1 什么是事件总线?
- 1.2 为什么要用EventBus
- 二、CAP使用
- 2.1 环境准备
- 2.2 代码修改
- 三、运行测试
- 四、总结
前言📃
关于 微服务 的概念解释网上有很多, 个人理解微服务是一种系统架构模式,它和语言无关,和框架无关,和工具无关,和服务器环境无关。
微服务思想 是将传统的单体系统按照业务拆分成多个职责单一、且可独立运行的接口服务。至于服务如何拆分,没有明确的定义。几乎任何后端语言都能做微服务开发。微服务也并不是完美无缺的,微服务架构会带来更多的问题,增加系统的复杂度,引入更多的技术栈。
上一篇【.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)】中已经完成了 Ocelot + Consul 的搭建,这一篇简单说一下 EventBus。
一、EventBus-事件总线
1.1 什么是事件总线?
🌈事件总线 是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种 解耦 的目的。
如果没有接触过 EventBus ,可能不太好理解。其实 EventBus 在客户端开发中应用非常广泛android,ios,web 前端等,用于多个组件(或者界面)之间的相互通信。
1.2 为什么要用EventBus
就拿当前的项目举例,我们有一个订单服务,一个产品服务。客户端有一个下单功能,当用户下单时,调用订单服务的下单接口,那么下单接口需要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。那么服务之间又怎么调用呢?直接 RESTAPI?或者效率更高的gRPC?可能这两者各有各的使用场景,但是他们都存在一个服务之间的耦合问题,或者难以做到异步调用。
试想一下:假设我们下单时调用订单服务,订单服务需要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务 等等。。。如果每个服务处理时间需要2s,不使用异步的话,那这种体验可想而知。
如果使用 EventBus 的话,那么订单服务只需要向 EventBus 发一个“下单事件”就可以了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,自己去减库存就好了。这样就避免了两个服务之间直接调用的耦合性,并且真正做到了异步调用。
既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并不是微服务独有的问题,而是所有的分布式系统都会存在的问题。
关于分布式事务,可以查一下 “CAP原则” 和 “BASE理论” 了解更多。当今的分布式系统更多的会追求事务的最终一致性。
下面使用国人开发的优秀项目 “CAP”,来演示一下 EventBus 的基本使用。之所以使用 “CAP”是因为它既能解决分布式系统的最终一致性,同时又是一个 EventBus,它具备 EventBus 的所有功能!
作者介绍:https://www.cnblogs.com/savorboard/p/cap.html
二、CAP使用
2.1 环境准备
在 Docker 中准备一下需要的环境,首先是数据库,数据库我使用 PostgreSQL,用别的也行。CAP 支持:SqlServer,MySql,PostgreSql,MongoDB。
然后是MQ,这里我使用 RabbitMQ ,Kafka 也可以。
Docker运行RabbitMQ:
docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management
🔑默认用户:guest,密码:guest
环境准备就完成了,Docker 就是这么方便。
2.2 代码修改
为了模拟以上业务,需要修改大量代码,下面代码如有遗漏的直接去github找。
NuGet安装:
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

CAP相关:
DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

Order.API/Controllers/OrdersController.cs 增加下单接口:
[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{private readonly ILogger<OrdersController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly OrderContext _context;public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【订单服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 下单 发布下单事件/// </summary>/// <param name="order"></param>/// <returns></returns>[Route("Create")][HttpPost]public async Task<IActionResult> CreateOrder(Models.Order order){using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)){//业务代码order.CreateTime = DateTime.Now;_context.Orders.Add(order);var r = await _context.SaveChangesAsync() > 0;if (r){//发布下单事件await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });return Ok();}return BadRequest();}}}
Order.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 产品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>public int Count { get; set; }
}
Order.API/Models/Order.cs订单实体类:
public class Order
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 下单时间/// </summary>[Required]public DateTime CreateTime { get; set; }/// <summary>/// 产品ID/// </summary>[Required]public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>[Required]public int Count { get; set; }
}
Order.API/Models/OrderContext.cs数据库Context:
public class OrderContext : DbContext
{public OrderContext(DbContextOptions<OrderContext> options): base(options){}public DbSet<Order> Orders { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){}
}
Order.API/appsettings.json增加数据库连接字符串:
"ConnectionStrings": {"OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}
Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<OrderContext>();x.UseRabbitMQ("host.docker.internal");});
}

以上是订单服务的修改。
Product.API/Controllers/ProductsController.cs增加减库存接口:
[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{private readonly ILogger<ProductsController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly ProductContext _context;public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【产品服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 减库存 订阅下单事件/// </summary>/// <param name="message"></param>/// <returns></returns>[NonAction][CapSubscribe("order.services.createorder")]public async Task ReduceStock(CreateOrderMessageDto message){//业务代码var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);product.Stock -= message.Count;await _context.SaveChangesAsync();}}
Product.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 产品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>public int Count { get; set; }
}
Product.API/Models/Product.cs产品实体类:
public class Product
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 产品名称/// </summary>[Required][Column(TypeName = "VARCHAR(16)")]public string Name { get; set; }/// <summary>/// 库存/// </summary>[Required]public int Stock { get; set; }
}
Product.API/Models/ProductContext.cs数据库Context:
public class ProductContext : DbContext
{public ProductContext(DbContextOptions<ProductContext> options): base(options){}public DbSet<Product> Products { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){base.OnModelCreating(modelBuilder);//初始化种子数据modelBuilder.Entity<Product>().HasData(new Product{ID = 1,Name = "产品1",Stock = 100},new Product{ID = 2,Name = "产品2",Stock = 100});}
}
Product.API/appsettings.json增加数据库连接字符串:
"ConnectionStrings": {"ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}
Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:\
public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<ProductContext>();x.UseRabbitMQ("host.docker.internal");});
}

以上是产品服务的修改。
订单服务和产品服务的修改到此就完成了,看着修改很多,其实功能很简单。就是各自增加了自己的数据库表,然后订单服务增加了下单接口,下单接口会发出 “下单事件”。产品服务增加了减库存接口,减库存接口会订阅 “下单事件”。然后客户端调用下单接口下单时,产品服务会减去相应的库存,功能就这么简单。
关于 EF数据库迁移 之类的基本使用就不介绍了。使用 Docker 重新构建镜像,运行订单服务,产品服务:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
最后 Ocelot.APIGateway/ocelot.json 增加一条路由配置:

好了,进行到这里,整个环境就有点复杂了。确保我们的PostgreSQL,RabbitMQ,Consul,Gateway,服务实例都正常运行。
服务实例运行成功后,数据库应该是这样的:




📃产品表种子数据:

cap.published 表和 cap.received 表是由 CAP自动生成的,它内部是使用本地消息表+MQ来实现异步确保。
三、运行测试
这次使用 Postman 作为客户端调用下单接口( 9070 是之前的 Ocelot 网关端口):

订单库 published 表:

订单库 order 表:

产品库 received 表:

产品库 product 表:

再试一下:


OK,完成。虽然功能很简单,但是我们实现了服务的解耦,异步调用,和最终一致性。
四、总结
注意,上面的例子纯粹是为了说明 EventBus 的使用,实际中的下单流程绝对不会这么做的!希望大家不要较真。
可能有人会说如果下单成功,但是库存不足导致减库存失败了怎么办,是不是要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。
首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。这里的事务是指:订单保存到数据库,和下单事件保存到 cap.published 表(保存到 cap.published 表理论上就能够发送到MQ)这两件事情,要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的,至于产品服务的减库存是否成功那就是产品服务的事情了(理论上也应该是成功的,因为消息已经确保发到了MQ,产品服务必然会收到消息),CAP也提供了失败重试,和失败回调机制。
如果非要数据回滚也是能实现的,CAP 的 ICapPublisher.Publish 方法提供一个 callbackName参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,这是不推荐的做法,就不详细说了,有兴趣自己研究一下。
另外,CAP 无法保证消息不重复,实际使用中需要自己考虑一下消息的重复过滤和幂等性。

相关文章:
.Net Core微服务入门全纪录(六)——EventBus-事件总线
系列文章目录 1、.Net Core微服务入门系列(一)——项目搭建 2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上) 3、.Net Core微服务入门全纪录(三)——Consul-服务注…...
1/20赛后总结
1/20赛后总结 T1『讨论区管理员』的旅行 - BBC编程训练营 算法:IDA* 分数:0 damn it! Ac_code走丢了~~(主要是没有写出来)~~ T2华强买瓜 - BBC编程训练营 算法:双向DFS或者DFS剪枝 分数:0 Ac_code…...
PVE 虚拟机安装 Debian 无图形化界面服务器
Debian 安装 Debian 镜像下载 找一个Debian镜像服务器,根据需要的版本和自己硬件选择。 iso-cd/:较小,仅包含安装所需的基础组件,可能需要网络访问来完成安装。有镜像 debian-12.9.0-amd64-netinst.isoiso-dvd/:较…...
第17篇:python进阶:详解数据分析与处理
第17篇:数据分析与处理 内容简介 本篇文章将深入探讨数据分析与处理在Python中的应用。您将学习如何使用pandas库进行数据清洗与分析,掌握matplotlib和seaborn库进行数据可视化,以及处理大型数据集的技巧。通过丰富的代码示例和实战案例&am…...
三天急速通关Java基础知识:Day1 基本语法
三天急速通关JAVA基础知识:Day1 基本语法 0 文章说明1 关键字 Keywords2 注释 Comments2.1 单行注释2.2 多行注释2.3 文档注释 3 数据类型 Data Types3.1 基本数据类型3.2 引用数据类型 4 变量与常量 Variables and Constant5 运算符 Operators6 字符串 String7 输入…...
Python的进程和线程
ref 接受几个设定: 进程是一家almost密不透风的公司,缅甸KK园区 线程里面工作的…人 进程**[园区]**内公共资源对于进程来说,可以共享. 别的园区[进程],一般不能和自己的园区共享人员资源,除非… 好的,现在再接受设定: 单个CPU在任一时刻只能执行单个线程,只有…...
【Mysql】记录锁、间隙锁和临键锁的区别
InnoDB通过MVCCNext-Key Locks,解决了可重复读的事务隔离级别出现的幻读问题。 记录锁 记录锁就是为某行数据进行加锁,它封锁该行的索引记录 SELECT * FROM table WHERE id 1 FOR UPDATE id为1的记录行会被锁住。需要注意的的:id列必须为…...
神经网络|(二)sigmoid神经元函数
【1】引言 在前序学习进程中,我们已经了解了基本的二元分类器和神经元的构成,文章学习链接为: 神经网络|(一)加权平均法,感知机和神经元-CSDN博客 在此基础上,我们认识到神经元本身在做二元分类,是一种非…...
w-form-select.vue(自定义下拉框组件)(与后端字段直接相关性)
文章目录 1、w-form-select.vue 组件中每个属性的含义2、实例3、源代码 1、w-form-select.vue 组件中每个属性的含义 好的,我们来详细解释 w-form-select.vue 组件中每个属性的含义,并用表格列出它们是否与后端字段直接相关: 属性解释表格&…...
【JVM】垃圾收集器详解
你将学到 1. Serial 收集器 2. ParNew 收集器 3. Parallel Scavenge 收集器 4. Serial Old 收集器 5. Parallel Old 收集器 6. CMS 收集器 7. G1 收集器 在 Java 中,垃圾回收(GC)是自动管理内存的一个重要机制。HotSpot JVM 提供了多种…...
python创建一个httpServer网页上传文件到httpServer
一、代码 1.server.py import os from http.server import SimpleHTTPRequestHandler, HTTPServer import cgi # 自定义请求处理类 class MyRequestHandler(SimpleHTTPRequestHandler):# 处理GET请求def do_GET(self):if self.path /:# 响应200状态码self.send_response(2…...
【Maui】提示消息的扩展
文章目录 前言一、问题描述二、解决方案三、软件开发(源码)3.1 消息扩展库3.2 消息提示框使用3.3 错误消息提示使用3.4 问题选择框使用 四、项目展示 前言 .NET 多平台应用 UI (.NET MAUI) 是一个跨平台框架,用于使用 C# 和 XAML 创建本机移…...
租车骑绿岛
租车骑绿岛 真题目录: 点击去查看 E 卷 100分题型 题目描述 部门组织绿岛骑行团建活动。租用公共双人自行车,每辆自行车最多坐两人,最大载重M。给出部门每个人的体重,请问最多需要租用多少双人自行车。 输入描述 第一行两个数字m、n&…...
Pytorch - YOLOv11自定义资料训练
►前言 本篇将讲解目前最新推出的YOLOv11搭配Roboflow进行自定义资料标注训练流程,透过Colab上进行实作说明,使大家能够容易的了解YOLOv11的使用。 ►YOLO框架下载与导入 ►Roboflow的资料收集与标注 进行自定义资料集建置与上传 透过Roboflow工具进行…...
微服务与docker
准备工作 在课前资料中给大家提供了黑马商城项目的资料,我们需要先导入这个单体项目。不过需要注意的是,本篇及后续的微服务学习都是基于Centos7系统下的Docker部署,因此你必须做好一些准备: Centos7的环境及一个好用的SSH客户端装好Docker会使用Docker如果是学习过上面Doc…...
1.23 消息队列
使用消息队列,实现两个终端相互聊天 程序代码: w1.c #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h&g…...
【华为路由的arp配置】
华为路由的arp配置 ARP:IP地址与MAC地址的映射。 R1: g0/0/0:10.1.1.254/24 g0/0/1:10.1.2.254/24 PC1: 10.1.1.1/16 PC2: 10.1.1.2/16 PC3: 10.1.2.3/16 动态ARP 查看PC1的arp表,可以看到,列表为空。 查看R1的arp表 在PC3上ping命令测…...
绘制决策树的尝试1
代码复制 import pydotplus 复制 - 这一行代码用于导入pydotplus模块,这是一个用来在Python中创建图形的工具。2. python from IPython.display import Image 这一行代码用于从IPython显示模块中导入Image类,它允许我们在Jupyter笔记本中显示图像。…...
概率论里的特征函数,如何用卷积定理去理解
概率论里的特征函数,如何用卷积定理去理解_哔哩哔哩_bilibili...
Spring 是如何解决循环依赖问题
Spring 框架通过 三级缓存 机制来解决循环依赖问题。循环依赖是指两个或多个 Bean 相互依赖,形成一个闭环,例如 Bean A 依赖 Bean B,而 Bean B 又依赖 Bean A。Spring 通过提前暴露未完全初始化的 Bean 来解决这个问题。 以下是 Spring 解决…...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
