当前位置: 首页 > news >正文

C# 如何实现一个事件总线

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

相关文章:

C# 如何实现一个事件总线

EventBus&#xff08;事件总线&#xff09;是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中&#xff0c;我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…...

Python学习路线图

防止忘记&#xff0c;温故知新 进阶路线...

作业2.14

chgrp: 只能修改文件的所属组 chgrp 新的组 文件名 要求&#xff1a;修改的目标组已经存在 chown: chown 新的用户名 文件名 sudo chown root &#xff1a;1 将文件1的所属组用户和所属组用户都改为root sudo chown root&#xff1a;ubuntu 1 将文件1的所属用户…...

基于python+django+mysql的小区物业管理系统

该系统是基于pythondjango开发的小区物业管理系统。适用场景&#xff1a;大学生、课程作业、毕业设计。学习过程中&#xff0c;如遇问题可以在github给作者留言。主要功能有&#xff1a;业主管理、报修管理、停车管理、资产管理、小区管理、用户管理、日志管理、系统信息。 演示…...

控制与状态机算法

控制与状态机算法是计算机科学、电子工程和自动化领域中常用的一种设计工具,它用来描述一个系统的行为,该系统在不同时间点可以处于不同的状态,并且其行为取决于当前状态以及输入的信号或事件。状态机算法的核心概念包括: 状态(State):系统的任何可能配置。每个状态代表…...

sql常用语句小结

创建表&#xff1a; create table 表名&#xff08; 字段1 字段类型 【约束】【comment 字段1注释】&#xff0c; //【】里面的东西可以不用加上去 字段2 字段类型 【约束】【comment 字段2注释】 &#xff09;【comment 表注释】 约束&#xff1a;作用于表中字段上的规则…...

云计算基础-虚拟机迁移原理

什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器&#xff08;或主机&#xff09;迁移到另一个物理服务器&#xff08;或主机&#xff09;的过程&#xff0c;而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移&#xff1a;开机状态下迁移 冷迁…...

云计算基础-云计算概念

云计算定义 云计算是一种基于互联网的计算方式&#xff0c;通过这种计算方式&#xff0c;共享的软硬件资源和信息可以按需提供给计算机和其他设备。云计算依赖资源共享以达成规模经济&#xff0c;类似基础设置(如电力网)。 云计算最基本的概念就是云加端&#xff0c;我们有一个…...

如何将阿里云服务器迁移

&#x1f4d1;前言 本文主要是如何将阿里云服务器迁移实现数据转移的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️** &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日…...

如何将本地的python项目部署到linux服务器中

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂 。 前言 本地写好的python项目&#xff0c;如何部署在服务器上运行呢&#xff1f;今天&#xff0c;我们就来抽一点点时间来看看。&#xff08;网上找的资料&#xff0c;大部分都囫囵吞枣的…...

每日五道java面试题之java基础篇(五)

目录&#xff1a; 第一题. final、finally、finalize 的区别&#xff1f;第二题. 和 equals 的区别&#xff1f;第三题.hashCode 与 equals?第四题. Java 是值传递&#xff0c;还是引⽤传递&#xff1f;第五题 深拷贝和浅拷贝&#xff1f; 第一题. final、finally、finalize 的…...

HiveSQL——用户行为路径分析

注&#xff1a;参考文档&#xff1a; SQL之用户行为路径分析--HQL面试题46【拼多多面试题】_路径分析 sql-CSDN博客文章浏览阅读2k次&#xff0c;点赞6次&#xff0c;收藏19次。目录0 问题描述1 数据分析2 小结0 问题描述已知用户行为表 tracking_log&#xff0c; 大概字段有&…...

专利的申请

申请发明或者实用新型专利的&#xff0c;应当提交请求书、说明书及其摘要和权利要求书等文件。 请求书应当写明发明或者实用新型的名称&#xff0c;发明人或者设计人的姓名&#xff0c;申请人姓名或者名称、地址&#xff0c;以及其他事项。 说明书应当对发明或者实用新型作出清…...

嵌入式学习 C++ Day5、6

嵌入式学习 C Day5、6 一、思维导图 二、作业 1.以下是一个简单的比喻&#xff0c;将多态概念与生活中的实际情况相联系&#xff1a; 比喻&#xff1a;动物园的讲解员和动物表演 想象一下你去了一家动物园&#xff0c;看到了许多不同种类的动物&#xff0c;如狮子、大象、猴…...

阿里云香港服务器cn2速度测试和租用价格表

阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品&#xff0c;中国电信CN2高速网络高质量、大规格BGP带宽&#xff0c;运营商精品公网直连中国内地&#xff0c;时延更低&#xff0c;优化海外回中国内地流量的公网线路&#xff0c;可以提高国际业务访问质量。阿里云服务…...

《学成在线》微服务实战项目实操笔记系列(P92~P120)【下】

史上最详细《学成在线》项目实操笔记系列【下】&#xff0c;跟视频的每一P对应&#xff0c;全系列18万字&#xff0c;涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳&#xff0c;参考这篇&#xff0c;相信会带给你极大启发。 四、课程发布模块 4.1 (课程发布)模块需求…...

php数据类型以及运算符、判断条件

php数据类型以及运算符 1. php数据类型2. 使用举例3. 运算符4. 判断条件if else elseif 1. php数据类型 包括 String(字符串)、Integer(整型)、Float(浮点型)、Boolean(布尔型)、Array(数组)、Object(对象)、NULL(空值) 2. 使用举例 1.字符串 2.整型 3.浮点型 4.布尔型 5.数组…...

大数据01-导论

零、文章目录 大数据01-导论 1、数据与数据分析 **数据&#xff1a;是事实或观察的结果&#xff0c;是对客观事物的逻辑归纳&#xff0c;是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值&#xff0c;比如声音、图像&#xff0c;称为模拟数据&#xff1b;也可…...

智能网卡(SmartNIC):增强网络性能

在当今的数字时代&#xff0c;网络性能和数据安全是各行各业面临的关键挑战。智能网卡是一项颠覆性的技术创新&#xff0c;对增强网络性能和加强数据安全性具有关键推动作用。本文旨在探讨智能网卡的工作原理及其在不同应用场景中的重要作用。 什么是智能网卡&#xff1f; 智…...

算法刷题day14

目录 引言一、平均二、三国游戏三、松散子序列 引言 今天做了三道新题&#xff0c;类型是贪心、枚举、DP&#xff0c;不是特别难&#xff0c;但是努力一下刚好能够够得上&#xff0c;还是不错的&#xff0c;只要能够一直坚持下去&#xff0c;不断刷题不断总结&#xff0c;就是…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

第八部分:阶段项目 6:构建 React 前端应用

现在&#xff0c;是时候将你学到的 React 基础知识付诸实践&#xff0c;构建一个简单的前端应用来模拟与后端 API 的交互了。在这个阶段&#xff0c;你可以先使用模拟数据&#xff0c;或者如果你的后端 API&#xff08;阶段项目 5&#xff09;已经搭建好&#xff0c;可以直接连…...

向量几何的二元性:叉乘模长与内积投影的深层联系

在数学与物理的空间世界中&#xff0c;向量运算构成了理解几何结构的基石。叉乘&#xff08;外积&#xff09;与点积&#xff08;内积&#xff09;作为向量代数的两大支柱&#xff0c;表面上呈现出截然不同的几何意义与代数形式&#xff0c;却在深层次上揭示了向量间相互作用的…...