C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。
它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:IEvent
和IAsyncEventHandler<TEvent>
。
IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>
是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。
其中,Publish<TEvent>
和PublishAsync<TEvent>
方法用于发布事件,而OnSubscribe<TEvent>
方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>
。它实现了ILocalEventBusManager<TEvent>
接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>
来存储事件,并提供了发布事件的方法Publish
和PublishAsync
。此外,它还提供了一个自动处理事件的方法AutoHandle
。
总的来说Event Bus
提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束,实现IEvent
约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent
来约束事件的处理程序。
public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}
接下来规定一下咱们的IEventBus
,会有哪些操作方法。基本就是发布和订阅。
public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}
实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager
即本地事件管理,第二种是LocalEventBusPool
池化本地事件。
LocalEvnetBusManager
LocalEventBusManager
主要在单一管道内进行处理,集中进行消费。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool
LocalEventBusPool
即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus
实现LocalEventBus
继承自IEventBus
即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}
分布式事件总线
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。
相关文章:

C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…...

Python学习路线图
防止忘记,温故知新 进阶路线...

作业2.14
chgrp: 只能修改文件的所属组 chgrp 新的组 文件名 要求:修改的目标组已经存在 chown: chown 新的用户名 文件名 sudo chown root :1 将文件1的所属组用户和所属组用户都改为root sudo chown root:ubuntu 1 将文件1的所属用户…...

基于python+django+mysql的小区物业管理系统
该系统是基于pythondjango开发的小区物业管理系统。适用场景:大学生、课程作业、毕业设计。学习过程中,如遇问题可以在github给作者留言。主要功能有:业主管理、报修管理、停车管理、资产管理、小区管理、用户管理、日志管理、系统信息。 演示…...

控制与状态机算法
控制与状态机算法是计算机科学、电子工程和自动化领域中常用的一种设计工具,它用来描述一个系统的行为,该系统在不同时间点可以处于不同的状态,并且其行为取决于当前状态以及输入的信号或事件。状态机算法的核心概念包括: 状态(State):系统的任何可能配置。每个状态代表…...

sql常用语句小结
创建表: create table 表名( 字段1 字段类型 【约束】【comment 字段1注释】, //【】里面的东西可以不用加上去 字段2 字段类型 【约束】【comment 字段2注释】 )【comment 表注释】 约束:作用于表中字段上的规则…...

云计算基础-虚拟机迁移原理
什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器(或主机)迁移到另一个物理服务器(或主机)的过程,而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移:开机状态下迁移 冷迁…...

云计算基础-云计算概念
云计算定义 云计算是一种基于互联网的计算方式,通过这种计算方式,共享的软硬件资源和信息可以按需提供给计算机和其他设备。云计算依赖资源共享以达成规模经济,类似基础设置(如电力网)。 云计算最基本的概念就是云加端,我们有一个…...

如何将阿里云服务器迁移
📑前言 本文主要是如何将阿里云服务器迁移实现数据转移的文章,如果有什么需要改进的地方还请大佬指出⛺️** 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 🌄每日…...

如何将本地的python项目部署到linux服务器中
大家好,我是雄雄,欢迎关注微信公众号:雄雄的小课堂 。 前言 本地写好的python项目,如何部署在服务器上运行呢?今天,我们就来抽一点点时间来看看。(网上找的资料,大部分都囫囵吞枣的…...

每日五道java面试题之java基础篇(五)
目录: 第一题. final、finally、finalize 的区别?第二题. 和 equals 的区别?第三题.hashCode 与 equals?第四题. Java 是值传递,还是引⽤传递?第五题 深拷贝和浅拷贝? 第一题. final、finally、finalize 的…...
HiveSQL——用户行为路径分析
注:参考文档: SQL之用户行为路径分析--HQL面试题46【拼多多面试题】_路径分析 sql-CSDN博客文章浏览阅读2k次,点赞6次,收藏19次。目录0 问题描述1 数据分析2 小结0 问题描述已知用户行为表 tracking_log, 大概字段有&…...

专利的申请
申请发明或者实用新型专利的,应当提交请求书、说明书及其摘要和权利要求书等文件。 请求书应当写明发明或者实用新型的名称,发明人或者设计人的姓名,申请人姓名或者名称、地址,以及其他事项。 说明书应当对发明或者实用新型作出清…...

嵌入式学习 C++ Day5、6
嵌入式学习 C Day5、6 一、思维导图 二、作业 1.以下是一个简单的比喻,将多态概念与生活中的实际情况相联系: 比喻:动物园的讲解员和动物表演 想象一下你去了一家动物园,看到了许多不同种类的动物,如狮子、大象、猴…...

阿里云香港服务器cn2速度测试和租用价格表
阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品,中国电信CN2高速网络高质量、大规格BGP带宽,运营商精品公网直连中国内地,时延更低,优化海外回中国内地流量的公网线路,可以提高国际业务访问质量。阿里云服务…...

《学成在线》微服务实战项目实操笔记系列(P92~P120)【下】
史上最详细《学成在线》项目实操笔记系列【下】,跟视频的每一P对应,全系列18万字,涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳,参考这篇,相信会带给你极大启发。 四、课程发布模块 4.1 (课程发布)模块需求…...

php数据类型以及运算符、判断条件
php数据类型以及运算符 1. php数据类型2. 使用举例3. 运算符4. 判断条件if else elseif 1. php数据类型 包括 String(字符串)、Integer(整型)、Float(浮点型)、Boolean(布尔型)、Array(数组)、Object(对象)、NULL(空值) 2. 使用举例 1.字符串 2.整型 3.浮点型 4.布尔型 5.数组…...

大数据01-导论
零、文章目录 大数据01-导论 1、数据与数据分析 **数据:是事实或观察的结果,是对客观事物的逻辑归纳,是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值,比如声音、图像,称为模拟数据;也可…...

智能网卡(SmartNIC):增强网络性能
在当今的数字时代,网络性能和数据安全是各行各业面临的关键挑战。智能网卡是一项颠覆性的技术创新,对增强网络性能和加强数据安全性具有关键推动作用。本文旨在探讨智能网卡的工作原理及其在不同应用场景中的重要作用。 什么是智能网卡? 智…...

算法刷题day14
目录 引言一、平均二、三国游戏三、松散子序列 引言 今天做了三道新题,类型是贪心、枚举、DP,不是特别难,但是努力一下刚好能够够得上,还是不错的,只要能够一直坚持下去,不断刷题不断总结,就是…...

个性签名大全
只许一生浮世清欢愿我以孤独作为铠甲,自此不再受伤愿我是阳光,明媚而不忧伤我不敢太勇敢太执着太骄傲,我怕失去开始你是我的天使,最后你是我的唯一姐的霸气,无人能比,哥的傲气,无人能朋唯有万事…...

前端常用代码整理(不断更新中)— js,jquery篇(2)
目录 1.随机生成字符串 2.删除数组中重复元素 3.RGB到十六进制转换机制 4.打乱一个数组,重新组合 5.获取两个日期的时间间隔 (天数) 6.获取当天属于今年的第几天 7.截取字符串长度,超过部分显示为 ... 8.判断数组是否为空 9.英文句子首…...

普中51单片机学习(六)
点亮第一个LED LED相关知识 LED,即发光二极管,是一种半导体固体发光器件。工作原理为:LED的工作是有方向性的,只有当正级接到LED阳极,负极接到LED的阴极的时候才能工作,如果反接LED是不能正常工作的。其原理图如下 …...

visual studio注册码
最近在研究c/c 安装visual studio 需要注册 技术博客http://idea.coderyj.com/ 注册码 Visual Studio 2022(VS2022)激活码: Pro(专业版): TD244-P4NB7-YQ6XK-Y8MMM-YWV2J Enterprise(企业版): VHF9H-NXBBB-638P6-6JHC…...

Studio One 6.5下载安装激活图文教程
Studio One 6.5是由PreSonus公司打造一款功能强大的数字音乐创作软件,不仅为用户们提供了制作、混合、掌握和执行所有操作,还提供了简洁直观的主界面,因此使用起来也是十分的简单,就算是初学者也可以快速的上手使用起来࿰…...

Kubernetes(K8S)集群部署实战
目录 一、准备工作1.1、创建3台虚拟机1.1.1、下载虚拟机管理工具1.1.2、安装虚拟机管理工具1.1.3、下载虚Centos镜像1.1.4、创建台个虚拟机1.1.5、设置虚拟机网络环境 1.2、虚拟机基础配置(3台虚拟机进行相同处理)1.2.1、配置host1.2.2、关闭防火墙1.2.3…...

流畅的Python(十)-序列的修改、散列和切片
一、核心要义 以第九章定义的二维向量为基础,定义表示多为向量的Vector类。该类将支持如下功能: 1. 基本的序列协议 2. 适当的切片支持,且返回的是新Vector实例 3.综合各个元素的值计算散列值 4.格式化展示 二、代码示例 1、前情提要 …...

TCP/IP五层各层协议详解
TCP/IP协议栈是网络通信的基础,它由五层协议组成,分别是物理层、数据链路层、网络层、传输层和应用层。以下是对各层协议的详细解释: 1. 物理层(Physical Layer):该层负责传输比特流,主要定义传…...

MySQL 基础知识(九)之视图
目录 1 视图的介绍 2 视图算法 3 创建视图 4 查看视图结构 5 修改视图 6 删除视图 7 参考文档 1 视图的介绍 视图是一张并不存储数据的虚拟表,其本质是根据 SQL 语句动态查询数据库中的数据。数据库中只存放了视图的定义,通过 SQL 语句使用视图时…...

算法之力扣数青蛙
题目连接 文章目录 题目解析算法原理第一步第二步第三步第三步第四步指向o 代码讲解代码实现 题目解析 先给大家来讲解一下这个题目的意思吧,这个题目是说呢给你一个蛙叫的字符串让你去设计一个算法求出发出这种蛙叫最少需要几只青蛙。比如说第一个样例发出这种叫声…...