C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。
它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:IEvent和IAsyncEventHandler<TEvent>。
IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。
其中,Publish<TEvent>和PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法Publish和PublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle。
总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。
public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}
接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。
public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}
实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。
LocalEvnetBusManager
LocalEventBusManager主要在单一管道内进行处理,集中进行消费。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool
LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus
实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}
分布式事件总线
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。
相关文章:
C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…...
Python学习路线图
防止忘记,温故知新 进阶路线...
作业2.14
chgrp: 只能修改文件的所属组 chgrp 新的组 文件名 要求:修改的目标组已经存在 chown: chown 新的用户名 文件名 sudo chown root :1 将文件1的所属组用户和所属组用户都改为root sudo chown root:ubuntu 1 将文件1的所属用户…...
基于python+django+mysql的小区物业管理系统
该系统是基于pythondjango开发的小区物业管理系统。适用场景:大学生、课程作业、毕业设计。学习过程中,如遇问题可以在github给作者留言。主要功能有:业主管理、报修管理、停车管理、资产管理、小区管理、用户管理、日志管理、系统信息。 演示…...
控制与状态机算法
控制与状态机算法是计算机科学、电子工程和自动化领域中常用的一种设计工具,它用来描述一个系统的行为,该系统在不同时间点可以处于不同的状态,并且其行为取决于当前状态以及输入的信号或事件。状态机算法的核心概念包括: 状态(State):系统的任何可能配置。每个状态代表…...
sql常用语句小结
创建表: create table 表名( 字段1 字段类型 【约束】【comment 字段1注释】, //【】里面的东西可以不用加上去 字段2 字段类型 【约束】【comment 字段2注释】 )【comment 表注释】 约束:作用于表中字段上的规则…...
云计算基础-虚拟机迁移原理
什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器(或主机)迁移到另一个物理服务器(或主机)的过程,而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移:开机状态下迁移 冷迁…...
云计算基础-云计算概念
云计算定义 云计算是一种基于互联网的计算方式,通过这种计算方式,共享的软硬件资源和信息可以按需提供给计算机和其他设备。云计算依赖资源共享以达成规模经济,类似基础设置(如电力网)。 云计算最基本的概念就是云加端,我们有一个…...
如何将阿里云服务器迁移
📑前言 本文主要是如何将阿里云服务器迁移实现数据转移的文章,如果有什么需要改进的地方还请大佬指出⛺️** 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 🌄每日…...
如何将本地的python项目部署到linux服务器中
大家好,我是雄雄,欢迎关注微信公众号:雄雄的小课堂 。 前言 本地写好的python项目,如何部署在服务器上运行呢?今天,我们就来抽一点点时间来看看。(网上找的资料,大部分都囫囵吞枣的…...
每日五道java面试题之java基础篇(五)
目录: 第一题. final、finally、finalize 的区别?第二题. 和 equals 的区别?第三题.hashCode 与 equals?第四题. Java 是值传递,还是引⽤传递?第五题 深拷贝和浅拷贝? 第一题. final、finally、finalize 的…...
HiveSQL——用户行为路径分析
注:参考文档: SQL之用户行为路径分析--HQL面试题46【拼多多面试题】_路径分析 sql-CSDN博客文章浏览阅读2k次,点赞6次,收藏19次。目录0 问题描述1 数据分析2 小结0 问题描述已知用户行为表 tracking_log, 大概字段有&…...
专利的申请
申请发明或者实用新型专利的,应当提交请求书、说明书及其摘要和权利要求书等文件。 请求书应当写明发明或者实用新型的名称,发明人或者设计人的姓名,申请人姓名或者名称、地址,以及其他事项。 说明书应当对发明或者实用新型作出清…...
嵌入式学习 C++ Day5、6
嵌入式学习 C Day5、6 一、思维导图 二、作业 1.以下是一个简单的比喻,将多态概念与生活中的实际情况相联系: 比喻:动物园的讲解员和动物表演 想象一下你去了一家动物园,看到了许多不同种类的动物,如狮子、大象、猴…...
阿里云香港服务器cn2速度测试和租用价格表
阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品,中国电信CN2高速网络高质量、大规格BGP带宽,运营商精品公网直连中国内地,时延更低,优化海外回中国内地流量的公网线路,可以提高国际业务访问质量。阿里云服务…...
《学成在线》微服务实战项目实操笔记系列(P92~P120)【下】
史上最详细《学成在线》项目实操笔记系列【下】,跟视频的每一P对应,全系列18万字,涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳,参考这篇,相信会带给你极大启发。 四、课程发布模块 4.1 (课程发布)模块需求…...
php数据类型以及运算符、判断条件
php数据类型以及运算符 1. php数据类型2. 使用举例3. 运算符4. 判断条件if else elseif 1. php数据类型 包括 String(字符串)、Integer(整型)、Float(浮点型)、Boolean(布尔型)、Array(数组)、Object(对象)、NULL(空值) 2. 使用举例 1.字符串 2.整型 3.浮点型 4.布尔型 5.数组…...
大数据01-导论
零、文章目录 大数据01-导论 1、数据与数据分析 **数据:是事实或观察的结果,是对客观事物的逻辑归纳,是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值,比如声音、图像,称为模拟数据;也可…...
智能网卡(SmartNIC):增强网络性能
在当今的数字时代,网络性能和数据安全是各行各业面临的关键挑战。智能网卡是一项颠覆性的技术创新,对增强网络性能和加强数据安全性具有关键推动作用。本文旨在探讨智能网卡的工作原理及其在不同应用场景中的重要作用。 什么是智能网卡? 智…...
算法刷题day14
目录 引言一、平均二、三国游戏三、松散子序列 引言 今天做了三道新题,类型是贪心、枚举、DP,不是特别难,但是努力一下刚好能够够得上,还是不错的,只要能够一直坚持下去,不断刷题不断总结,就是…...
开源项目 Git 贡献全流程拆解:从入门到精通
好的,这是一篇关于开源项目 Git 贡献全流程拆解的技术文章大纲:开源项目 Git 贡献全流程拆解:从入门到精通引言开源精神与协作的重要性。Git 作为分布式版本控制系统在开源世界的核心地位。明确目标:清晰、完整地拆解向开源项目贡…...
用TurtleBot3实测:Navigation2局部代价地图的滚动窗口为何必须用odom坐标系?
TurtleBot3实测:为什么Navigation2局部代价地图必须绑定odom坐标系? 当你在Gazebo中第一次看到TurtleBot3的导航表现时,可能会对局部代价地图(Local Costmap)的坐标系选择产生疑问。为什么这个实时更新的避障地图要绑定…...
如何用kill-doc解决30+文档平台下载难题:免费高效的文档获取方案
如何用kill-doc解决30文档平台下载难题:免费高效的文档获取方案 【免费下载链接】kill-doc 看到经常有小伙伴们需要下载一些免费文档,但是相关网站浏览体验不好各种广告,各种登录验证,需要很多步骤才能下载文档,该脚本…...
从“变速齿轮”到“创新引擎”:解码阿里“大中台、小前台”战略的演进与实战
1. 中台战略的起源与本质 第一次听说"大中台、小前台"这个概念时,我正坐在杭州一家咖啡馆里和几位阿里P8的技术专家聊天。他们用了一个特别形象的比喻:"现在的互联网公司就像一辆老式自行车,前台是拼命蹬车的双腿,…...
如何用PPI网络community分析发现潜在药物靶点?微生信可视化保姆教程
从PPI网络到药物靶点:基于Community分析的生物标记物发现全流程 在生物医学研究的浩瀚海洋中,蛋白质-蛋白质相互作用(PPI)网络犹如一张精密的城市交通图,而community分析则帮助我们识别出其中的"功能街区"。想象一下,当…...
驯服失控菜单:让右键操作提速60%的实战指南
驯服失控菜单:让右键操作提速60%的实战指南 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你在Windows系统中右键点击文件时,是否曾面…...
深入解析PLL锁相环在FPGA时钟管理中的核心应用
1. 从闹钟到芯片:PLL如何成为FPGA的"时间管家" 想象一下你早上起床的场景:手机闹钟准时响起,咖啡机开始自动煮咖啡,窗帘缓缓拉开让阳光照进来。这些设备之所以能完美同步,全靠它们内部精确的时钟信号。而在…...
力扣高频经典双题解:接雨水 + 无重复最长子串(思路 + 满分代码)
接雨水、无重复字符最长子串是面试高频、算法入门必刷的经典题,一道考动态规划预处理,一道考滑动窗口,都是数组 / 字符串题型里的核心套路。本篇把两道题的思路讲透、代码写清,新手也能一遍看懂,刷题效率直接拉满&…...
相机潜能解锁:从限制突破到专业创作
相机潜能解锁:从限制突破到专业创作 【免费下载链接】OpenMemories-Tweak Unlock your Sony cameras settings 项目地址: https://gitcode.com/gh_mirrors/op/OpenMemories-Tweak OpenMemories-Tweak作为一款专为索尼相机设计的系统级解锁工具,通…...
ESP32嵌入式系统设计与实现指南
1. 项目概述1.1 系统架构本项目基于ESP32主控芯片设计,采用模块化架构实现多功能嵌入式系统。系统包含以下核心模块:主控单元:ESP32-WROOM-32D模组电源管理:TPS63020升降压转换器传感器接口:I2C/SPI多协议兼容设计人机…...
