当前位置: 首页 > news >正文

将Abp默认事件总线改造为分布式事件总线

文章目录

  • 原理
    • 创建分布式事件总线
    • 实现自动订阅和事件转发
  • 使用
    • 启动Redis服务
    • 配置
    • 传递Abp默认事件
    • 传递自定义事件
  • 项目地址

原理

本地事件总线是通过Ioc容器来实现的。

IEventBus接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。

Abp.Events.Bus.EventBus是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过Ioc容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。

创建分布式事件总线

首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。

开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus

这里已经定义好了一个分布式事件总线接口


public interface IDistributedEventBus : IDisposable
{void MessageHandle(string topic, string message);void Publish(IDistributedEventData eventData);void Subscribe(string topic);void Unsubscribe(string topic);void UnsubscribeAll();
}

为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自IEventBus接口。


public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{}

实现自动订阅和事件转发

当注册本地事件时,将订阅分布式事件,事件Topic为类型的字符串表现形式

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{GetOrCreateHandlerFactories(eventType);List<IEventHandlerFactory> currentLists;if (_handlerFactories.TryGetValue(eventType, out currentLists)){lock (currentLists){if (currentLists.Count == 0){//Register to distributed eventthis.Subscribe(eventType.ToString());}currentLists.Add(factory);}}return new FactoryUnregistrar(this, eventType, factory);
}

创建TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息payload,并发布该消息

public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{var exceptions = new List<Exception>();eventData.EventSource = eventSource;try{var payloadDictionary = new Dictionary<string, object>{{ PayloadKey, eventData }};var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);Publish(distributedeventData);}catch (Exception ex){exceptions.Add(ex);}if (exceptions.Any()){if (exceptions.Count == 1){exceptions[0].ReThrow();}throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);}
}

当触发本地事件时,将消息转发至分布式事件总线。
在Trigger方法中调用TriggerRemote,事件状态回调和事件异常回调将不会被转发。

if (!(typeof(DistributedEventBusEvent) == eventType|| typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)|| typeof(DistributedEventMessageHandleExceptionData) == eventType|| typeof(DistributedEventHandleExceptionData) == eventType))
{if (typeof(DistributedEventArgs) != eventType){TriggerRemote(eventType, eventSource, eventData);}
}

在消费端接收到分布式事件消息时,从Topic中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。
本地事件处理器将触发最终的处理方法。


public virtual void MessageHandle(string topic, string message)
{Logger.Debug($"Receive message on topic {topic}");try{var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);var eventArgs = new DistributedEventArgs(eventData, topic, message);Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));if (!string.IsNullOrEmpty(eventData.Type)){string pattern = @"(.*?)\[(.*?)\]";Match match = Regex.Match(eventData.Type, pattern);if (match.Success){var type = match.Groups[1].Value;var type2 = match.Groups[2].Value;var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();if (localTriggerType != null && genericType != null){if (localTriggerType.GetTypeInfo().IsGenericType&& localTriggerType.GetGenericArguments().Length == 1&& !genericType.IsAbstract && !genericType.IsInterface){var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);Trigger(localTriggerGenericType, this, (IEventData)payloadObject);}}}}else{var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface){if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerType);Trigger(localTriggerType, this, (IEventData)payloadObject);}}}Trigger(this, new DistributedEventBusHandledEvent(eventArgs));}}catch (Exception ex){Logger.Error("Consume remote message exception", ex);Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));}
}

使用

DistributedEventBus有不同的实现方式,这里以Redis为例

启动Redis服务

下载Redis并启动服务,使用默认端口6379

配置

生产者和消费者端都需要配置分布式事件总线

首先引用Abp.DistributedEventBus.Redis,并配置Abp模块依赖

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis连接信息

 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>{setting.Server = "127.0.0.1:6379";});

用MultipleEventBus替换Abp默认事件总线

 //todo: 事件总线Configuration.ReplaceService(typeof(IEventBus),() => IocManager.IocContainer.Register(Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()));

传递Abp默认事件

我们知道在使用仓储时,Abp会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。

定义一个实体类,用于传递实体的增删改事件。


public class Person : FullAuditedEntity<int>
{public string Name { get; set; }public int Age { get; set; }public string PhoneNumber { get; set; }}

在消费者端,定义一个事件处理器,用于处理实体的增删改事件。


public class RemoteEntityChangedEventHandler :IEventHandler<EntityUpdatedEventData<Person>>,IEventHandler<EntityCreatedEventData<Person>>,IEventHandler<EntityDeletedEventData<Person>>,ITransientDependency
{void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}
}

在生产者端,用IRepository对实体进行增删改操作。


var person = new Person()
{Name = "John",Age = 36,PhoneNumber = "18588888888"};personRepository.Insert(person);var person2 = new Person()
{Name = "John2",Age = 36,PhoneNumber = "18588888889"};
personRepository.Insert(person2);var persons = personRepository.GetAllList();
foreach (var p in persons)
{p.Age += 1;personRepository.Update(p);Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}
foreach (var p in persons)
{personRepository.Delete(p);Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。

在这里插入图片描述

注意:

分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。
因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。

public override Assembly[] GetAdditionalAssemblies()
{var clientModuleAssembly = typeof(Person).GetAssembly();return [clientModuleAssembly];
}

传递自定义事件

定义NotificationEventData,用于传递自定义事件。


public class NotificationEventData : EventData
{public int Id { get; set; }public string Title { get; set; }public string Message { get; set; }public bool IsRead { get; set; }
}

在消费者端,定义一个事件处理器,用于处理自定义事件。

public class NotificationEventHandler :IEventHandler<NotificationEventData>,      ITransientDependency
{void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData){Console.WriteLine($"Id: {eventData.Id}");Console.WriteLine($"Title: {eventData.Title}");Console.WriteLine($"Message: {eventData.Message}");Console.WriteLine($"IsRead: {eventData.IsRead}");}
}

在生产者端,触发自定义事件。

var eventBus = IocManager.Instance.Resolve<IEventBus>();eventBus.Trigger<NotificationEventData>(new NotificationEventData()
{Title = "Hi",Message = "Customized definition event test!",Id = 100,IsRead = true,
});

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。

在这里插入图片描述

项目地址

Github:DistributedEventBus

相关文章:

将Abp默认事件总线改造为分布式事件总线

文章目录 原理创建分布式事件总线实现自动订阅和事件转发 使用启动Redis服务配置传递Abp默认事件传递自定义事件 项目地址 原理 本地事件总线是通过Ioc容器来实现的。 IEventBus接口定义了事件总线的基本功能&#xff0c;如注册事件、取消注册事件、触发事件等。 Abp.Events…...

Jupyter Notebook修改默认工作目录

1、参考修改Jupyter Notebook的默认工作目录_jupyter文件路径-CSDN博客修改配置文件 2.在上述博客内容的基础上&#xff0c;这里不是删除【%USERPROFILE%】而是把这个地方替换为所要设置的工作目录路径&#xff0c; 3.【起始位置】也可以更改为所要设置的工作目录路径&#x…...

高校/企业如何去做数据挖掘呢?

随着近年来人工智能及大数据、云计算进入爆发时期&#xff0c;依托三者进行的数据分析、数据挖掘服务已逐渐成为各行业进行产业升级的载体&#xff0c;缓慢渗透进我们的工作和生活&#xff0c;成为新时代升级版的智能“大案牍术”。 那么对于多数企业来说&#xff0c;如何做数据…...

数据仓库-数据治理小厂实践

一、简介 数据治理贯穿数仓中数据的整个生命周期&#xff0c;从数据的产生、加载、清洗、计算&#xff0c;再到数据展示、应用&#xff0c;每个阶段都需要对数据进行治理&#xff0c;像有些比较大的企业都是有自己的数据治理平台或者会开发一些便捷的平台&#xff0c;对于没有平…...

【C++多线程编程】(五)之 线程生命周期管理join() 与 detach()

在C中&#xff0c;std::thread 类用于创建和管理线程。std::thread 提供了两种主要的方法来控制线程的生命周期&#xff1a;join 和 detach。 detach方式&#xff0c;启动的线程自主在后台运行&#xff0c;当前的代码继续往下执行&#xff0c;不等待新线程结束。join方式&…...

金融信贷场景的风险“要素”与主要“风险点”

目录 要素一:贷款对象 风险点1:为不具备主体资格或主体资格有瑕疵的借款人发放贷款 风险表现: 防控措施: 风险点2:向国家限控行业发放贷款 风险表现: 防控措施: 风险点3:受理不符合准入条件的客户申请 风险表现: 防控措施: 要素二:金额 风险点4:过渡授…...

ubuntu下docker安装,配置python运行环境

参考自: 1.最详细ubuntu安装docker教程 2.使用docker搭建python环境 首先假设已经安装了docker&#xff0c;卸载原来的docker 在命令行中运行&#xff1a; sudo apt-get updatesudo apt-get remove docker docker-engine docker.io containerd runc 安装docker依赖 apt-get…...

在Docker中安装kafka遇到问题记录

命令含义解答&#xff1a; 在docker安装kafka的时候&#xff0c;启动kafka的时候会执行下面语句&#xff1a; docker run -d --log-driver json-file --log-opt max-size100m --log-opt max-file2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID0 -e KAFKA_ZOOKEEPER_CONNEC…...

aws-waf-cdn 基于规则组的永黑解决方案

1. 新建waf 规则组 2. 为规则组添加规则 根据需求创建不同的规则 3. waf中附加规则组 &#xff08;此时规则组所有规则都会附加到waf中&#xff0c;但是不会永黑&#xff09; 此刻&#xff0c;可以选择测试下规则是否生效&#xff0c;测试前确认保护资源绑定无误 4. 创建堆…...

如何实现免费无限流量云同步笔记软件Obsidian?

目录 前言 如何实现免费无限流量云同步笔记软件Obsidian&#xff1f; 一、简介 软件特色演示&#xff1a; 二、使用免费群晖虚拟机搭建群晖Synology Drive服务&#xff0c;实现局域网同步 1 安装并设置Synology Drive套件 2 局域网内同步文件测试 三、内网穿透群晖Synol…...

GPTs | Actions应用案例

上篇文章说道&#xff0c;如何使用创建的GPTs通过API接口去获取外部的一些信息&#xff0c;然后把获取的外部信息返回给ChatGPT让它加工出来&#xff0c;回答你的问题&#xff0c;今天我们就来做一个通俗易懂的小案例&#xff0c;让大家来初步了解一下它的使用法&#xff01; …...

Python Opencv实践 - 手势音量控制

本文基于前面的手部跟踪功能做一个手势音量控制功能&#xff0c;代码用到了前面手部跟踪封装的HandDetector.这篇文章在这里&#xff1a; Python Opencv实践 - 手部跟踪-CSDN博客文章浏览阅读626次&#xff0c;点赞11次&#xff0c;收藏7次。使用mediapipe库做手部的实时跟踪&…...

关于Selenium的网页对象单元测试的设计模式

写在前面&#xff1a;经过了实践总结一下经验&#xff0c;心得进行一个分享。 首先driver是可以单独抽出来的&#xff0c;变成一个driver函数放在driver.py。 from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver…...

基于多反应堆的高并发服务器【C/C++/Reactor】(上)

&#xff08;一&#xff09;初始化服务器端用于监听的套接字 Server.h #pragma once // 初始化监听的套接字 int initListenFd(unsigned short port); Server.c int initListenFd(unsigned short port) {// 1.创建监听的fdint lfd socket(AF_INET, SOCK_STREAM, 0);if(lf…...

腾讯云debian服务器的连接与初始化

目录 1. 远程连接2. 软件下载3. 设置开机自启动 1. 远程连接 腾讯云给的服务器在安装好系统之后&#xff0c;只需要在防火墙里面添加一个白名单&#xff08;ip 或者域名&#xff09;就能访问了。 浏览器打开https://www.ipip.net/&#xff0c;在左下角找到自己所用的WIFI的公…...

医保购药小程序:智能合约引领医疗数字革新

在医疗领域&#xff0c;医保购药小程序通过引入智能合约技术&#xff0c;为用户提供更为高效、安全的购药体验。本文将通过简单的智能合约代码示例&#xff0c;深入探讨医保购药小程序如何利用区块链技术中的智能合约&#xff0c;实现医保结算、购药监控等功能&#xff0c;为医…...

神经网络:深度学习优化方法

1.有哪些方法能提升CNN模型的泛化能力 采集更多数据&#xff1a;数据决定算法的上限。 优化数据分布&#xff1a;数据类别均衡。 选用合适的目标函数。 设计合适的网络结构。 数据增强。 权值正则化。 使用合适的优化器等。 2.BN层面试高频问题大汇总 BN层解决了什么问…...

Unity中Shader旋转矩阵(二维旋转矩阵)

文章目录 前言一、旋转矩阵的原理1、我们以原点为中心&#xff0c;旋转坐标轴θ度2、求 P~2x~&#xff1a;3、求P~2y~:4、最后得到 P~2~点 的点阵5、该点阵可以拆分为以下两个矩阵相乘的结果 二、在Shader中&#xff0c;使用该旋转矩阵实现围绕 z 轴旋转1、在属性面板定义 floa…...

前端面试题(计算机网络):options请求方法及使用场景

OPTIONS请求方法及使用场景 回答思路&#xff1a;什么是options请求-->options请求方法-->options使用场景什么是options请求&#xff1f;&#xff08;浅入&#xff09;扩展&#xff1a;常见的HTTP请求有什么&#xff1f;扩展&#xff1a;常见的HTTP请求的作用&#xff1…...

使用docker-compose管理docker服务

使用docker-compose管理docker服务 1&#xff0c;创建docker-compose.yml version: 3 services:javaapp:build: context: ./javaappdockerfile: Dockerfileports:- "9202:9202"- "19202:19202"goapp:build: context: ./goappdockerfile: Dockerfileports…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令&#xff0c;把数据流转换成Message&#xff0c;状态转变流程是&#xff1a;State::Created 》 St…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

shell脚本质数判断

shell脚本质数判断 shell输入一个正整数,判断是否为质数(素数&#xff09;shell求1-100内的质数shell求给定数组输出其中的质数 shell输入一个正整数,判断是否为质数(素数&#xff09; 思路&#xff1a; 1:1 2:1 2 3:1 2 3 4:1 2 3 4 5:1 2 3 4 5-------> 3:2 4:2 3 5:2 3…...

CSS 工具对比:UnoCSS vs Tailwind CSS,谁是你的菜?

在现代前端开发中&#xff0c;Utility-First (功能优先) CSS 框架已经成为主流。其中&#xff0c;Tailwind CSS 无疑是市场的领导者和标杆。然而&#xff0c;一个名为 UnoCSS 的新星正以其惊人的性能和极致的灵活性迅速崛起。 这篇文章将深入探讨这两款工具的核心理念、技术差…...

轻量安全的密码管理工具Vaultwarden

一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版&#xff0c;由国外开发者在Bitwarden的基础上&#xff0c;采用Rust语言重写而成。 &#xff08;一&#xff09;Vaultwarden镜像的作用及特点 轻量级与高性…...