在 Go 中实现事件溯源:构建高效且可扩展的系统
事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的步骤和工具来实现。本文将详细介绍如何在 Go 中实现事件溯源,包括定义事件和聚合根、事件存储、事件处理以及使用事件总线。此外,我们还会探讨一些最佳实践和实际案例,帮助你更好地理解和应用事件溯源。
1. 事件溯源与 CQRS
事件溯源通常与命令查询责任分离(Command Query Responsibility Segregation,CQRS)模式结合使用。CQRS 是一种设计模式,它将应用程序的读操作和写操作分离,从而提高系统的可扩展性和性能[7]。在 CQRS 中,聚合根(Aggregate Root)是核心实体,它封装了业务逻辑,并通过事件来记录状态变化[7]。
1.1 事件溯源的核心概念
事件溯源的核心是事件(Event),它表示系统中已经发生的一个不可变的事实。事件通常是不可变的,一旦生成就无法修改。事件溯源通过记录这些事件来重建系统的状态[5]。
1.2 CQRS 的核心概念
CQRS 将应用程序分为命令(Command)和查询(Query)两个部分。命令用于修改系统的状态,而查询用于读取系统的状态。这种分离使得系统可以更灵活地扩展[7]。
2. 定义事件和聚合根
2.1 事件
事件是事件溯源的核心,它表示系统中已经发生的一个不可变的事实。事件通常包含以下字段:
- EventID:事件的唯一标识符。
- EventType:事件的类型。
- Data:事件的具体数据,通常以字节流的形式存储。
- Timestamp:事件发生的时间戳。
- AggregateType:聚合根的类型。
- AggregateID:聚合根的唯一标识符。
- Version:事件的版本号。
- Metadata:事件的元数据,用于存储额外信息。
以下是一个简单的事件结构体定义:
type Event struct {EventID stringEventType stringData []byteTimestamp time.TimeAggregateType stringAggregateID stringVersion int64Metadata []byte
}
2.2 聚合根
聚合根是事件溯源中的核心实体,它封装了业务逻辑,并通过事件来记录状态变化。聚合根通常包含以下字段:
- ID:聚合根的唯一标识符。
- Version:聚合根的版本号。
- AppliedEvents:已经应用的事件列表。
- UncommittedEvents:尚未提交的事件列表。
- Type:聚合根的类型。
- when:事件处理函数。
以下是一个聚合根的实现示例:
type AggregateBase struct {ID stringVersion int64AppliedEvents []EventUncommittedEvents []EventType stringwhen func(Event) error
}func (a *AggregateBase) Apply(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if err := a.when(event); err != nil {return err}a.Version++event.Version = a.Versiona.UncommittedEvents = append(a.UncommittedEvents, event)return nil
}
3. 事件存储
事件存储是事件溯源的关键组件,用于持久化和检索事件。可以使用专门的事件存储数据库(如 EventStoreDB),也可以使用通用的数据库(如 PostgreSQL 或 MongoDB)[6]。
3.1 加载聚合根
加载聚合根时,从事件存储中读取所有相关事件,并通过 RaiseEvent 方法重建聚合根的状态:
func (a *AggregateBase) RaiseEvent(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if a.Version >= event.Version {return ErrInvalidEventVersion}if err := a.when(event); err != nil {return err}a.Version = event.Versionreturn nil
}
3.2 事件存储接口
事件存储接口定义了加载和保存聚合根的方法。以下是一个简单的事件存储接口定义:
type AggregateStore interface {Load(ctx context.Context, aggregate Aggregate) errorSave(ctx context.Context, aggregate Aggregate) errorExists(ctx context.Context, streamID string) error
}
3.3 实现事件存储
以下是一个基于 PostgreSQL 的事件存储实现示例:
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())if err != nil && !errors.Is(err, pgx.ErrNoRows) {return tracing.TraceWithErr(span, err)}if snapshot != nil {if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))}err := p.loadAggregateEventsByVersion(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil}err = p.loadEvents(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil
}func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))if len(aggregate.GetChanges()) == 0 {p.log.Debug("(Save) aggregate.GetChanges()) == 0")span.LogFields(log.Int("events", len(aggregate.GetChanges())))return nil}tx, err := p.db.Begin(ctx)if err != nil {p.log.Errorf("(Save) db.Begin err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))}defer func() {if tx != nil {if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {err = txErrtracing.TraceErr(span, err)return}}}()changes := aggregate.GetChanges()events := make([]Event, 0, len(changes))for i := range changes {event, err := p.serializer.SerializeEvent(aggregate, changes[i])if err != nil {p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))}events = append(events, event)}if err := p.saveEventsTx(ctx, tx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))}if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {aggregate.ToSnapshot()if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))}}if err := p.processEvents(ctx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))}p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return tx.Commit(ctx)
}
4. 事件处理
事件处理逻辑可以通过事件处理器来实现。事件处理器监听事件并执行相应的业务逻辑[7]。
4.1 定义事件处理器
以下是一个事件处理器的示例:
type OrderEventHandler struct{}func (h *OrderEventHandler) Handle(event interface{}) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑// 处理其他事件}return nil
}
5. 使用事件溯源库
为了简化事件溯源的实现,可以使用一些现成的事件溯源库。例如,go.cqrs 是一个支持 CQRS 和事件溯源的框架[7]。
5.1
示例:处理命令和事件
type OrderAggregate struct {*cqrs.AggregateBasestatus string
}func (a *OrderAggregate) Handle(command interface{}) error {switch c := command.(type) {case PlaceOrderCommand:a.status = "Placed"a.apply(OrderPlacedEvent{OrderID: c.OrderID}) // 应用事件以反映新状态// 处理其他命令}return nil
}
6. 事件发布和订阅
事件可以通过事件总线发布,并由多个消费者订阅。
6.1 使用事件总线
以下是一个事件总线的示例:
dispatcher := goevents.NewEventDispatcher[*MyEvent]()// 添加订阅者
dispatcher.AddSubscriber(MySubscriber{})// 发布事件
event := NewMyEvent("user.created", "John Doe")
dispatcher.Dispatch(event)
7. 实际案例
7.1 微服务架构中的事件溯源
在微服务架构中,事件溯源可以用于实现服务之间的解耦和通信。以下是一个基于 Go 的微服务架构示例,展示如何使用事件溯源来实现订单处理系统。
7.1.1 订单服务
订单服务负责处理订单相关的业务逻辑,包括下单、支付和发货等操作。
type OrderService struct {eventStore AggregateStoreeventBus EventBus
}func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {aggregate := NewOrderAggregate(order)err := s.eventStore.Load(ctx, aggregate)if err != nil {return err}err = aggregate.Handle(PlaceOrderCommand{OrderID: order.ID})if err != nil {return err}err = s.eventStore.Save(ctx, aggregate)if err != nil {return err}for _, event := range aggregate.GetChanges() {s.eventBus.Publish(event)}return nil
}
7.1.2 支付服务
支付服务负责处理支付相关的业务逻辑,包括支付成功和支付失败等操作。
type PaymentService struct {eventBus EventBus
}func (s *PaymentService) HandlePayment(ctx context.Context, payment Payment) error {err := s.eventBus.Subscribe(ctx, func(event Event) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑return nil// 处理其他事件}return nil})if err != nil {return err}return nil
}
8. 最佳实践
8.1 事件设计
- 不可变性:事件一旦生成就不可修改。
- 包含足够的信息:事件应该包含足够的信息,以便能够重建系统的状态。
- 版本控制:事件应该包含版本号,以便能够处理并发问题。
8.2 聚合根设计
- 封装业务逻辑:聚合根应该封装业务逻辑,并通过事件来记录状态变化。
- 避免过多的事件:聚合根应该尽量减少事件的数量,以提高性能。
8.3 事件存储设计
- 高性能:事件存储应该支持高性能的读写操作。
- 可扩展性:事件存储应该支持水平扩展,以满足高并发的需求。
8.4 事件总线设计
- 解耦:事件总线应该支持解耦,使得服务之间不需要直接通信。
- 异步处理:事件总线应该支持异步处理,以提高系统的响应速度。
9. 总结
在 Go 中实现事件溯源需要定义事件和聚合根,使用事件存储来持久化事件,并通过事件处理器来处理事件。可以使用现成的事件溯源库(如 go.cqrs)来简化实现。事件总线可以用于发布和订阅事件,支持异步处理。事件溯源不仅能够提高系统的可扩展性和可维护性,还能为系统提供强大的可追溯性。
希望本文能帮助你更好地理解和实现事件溯源。如果你有任何问题或建议,欢迎在评论区留言。
相关文章:
在 Go 中实现事件溯源:构建高效且可扩展的系统
事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的…...
七、I2C通信读取LM75B温度
7.1 概述 I2C(Inter-Integrated Circuit)是一种同步、多主从、串行通信协议,由飞利浦公司开发,主要用于短距离通信,尤其在集成电路之间。 7.1.1 主要特点 两线制:仅需SDA(数据线)…...
Python 调用 Azure OpenAI API
在人工智能和机器学习快速发展的今天,Azure OpenAI 服务为开发者提供了强大的工具来集成先进的 AI 能力到他们的应用中。本文将指导您如何使用 Python 调用 Azure OpenAI API,特别是使用 GPT-4 模型进行对话生成。 准备工作 在开始之前,请确保您已经: 拥有一个 Azure 账户…...
Spring Boot 配置JPA数据库主从读写分离失败及解决办法
因为是老项目, Spring Boot 是1.4, 使用 AbstractRoutingDataSource 来做主从切换, 配置切面类在进入事务时切换成主库, 但实际运行起来却失败, 写操作路由到了从库 查了很多文章, 试了很多方法都无效, 包括修改注解 Transactional 的 propagation 属性, 清空主从标记等等 打…...
基于华为云镜像加速器的Docker环境搭建与项目部署指南
基于华为云镜像加速器的Docker环境搭建与项目部署指南 一、安装Docker1.1 更新系统包1.2 安装必要的依赖包1.3 移除原有的Docker仓库配置(如果存在)1.4 添加华为云Docker仓库1.5 安装Docker CE1.6 启动Docker服务1.7 验证Docker是否安装成功1.8 添加华为云镜像加速器地址二、…...
讲解下SpringBoot中MySql和MongoDB的配合使用
在Spring Boot中,MySQL和MongoDB可以配合使用,以充分发挥关系型数据库和非关系型数据库的优势。MySQL适合处理结构化数据,而MongoDB适合处理非结构化或半结构化数据。以下是如何在Spring Boot中同时使用MySQL和MongoDB的详细讲解。 1. 添加依…...
CSS 属性选择器详解与实战示例
CSS 属性选择器是 CSS 中非常强大且灵活的一类选择器,它能够根据 HTML 元素的属性和值来进行精准选中。在实际开发过程中,属性选择器不仅可以提高代码的可维护性,而且能够大大优化页面的样式控制。本文将结合菜鸟教程的示例,从基础…...
2025 游戏试玩打码平台PHP源码
源码介绍 2025 游戏试玩打码平台PHP源码 开发语言:PHP 数据库:MySQL 源码程序采用yii框架phpMysql语言开发 功能完善,无后门 程序功能有: 1.游戏试玩功能 2.广告体验功能 3.打码功能 4.新人任务 5.开启宝箱功能 6.站长联盟功能 7.兑换商城功…...
【Matlab算法】基于人工势场的多机器人协同运动与避障算法研究(附MATLAB完整代码)
📚基于人工势场的多机器人协同运动与避障算法研究 摘要1. 引言2. 方法说明2.1 人工势场模型2.2 运动控制流程3. 核心函数解释3.1 主循环结构3.2 力计算函数4. 实验设计4.1 参数配置4.2 测试场景5. 结果分析5.1 典型运动轨迹5.2 性能指标6. 总结与建议成果总结改进方向附录:完…...
自动化办公|xlwings 数据类型和转换
xlwings 数据类型和转换:Python 与 Excel 的桥梁 在使用 xlwings 进行 Python 和 Excel 数据交互时,理解两者之间的数据类型对应关系至关重要。本篇将详细介绍 Python 数据类型与 Excel 数据类型的对应关系,以及如何进行数据类型转换。 一、…...
北斗导航 | 基于多假设解分离(MHSS)模型的双星故障监测算法(MATLAB代码实现——ARAIM)
===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 双星故障监测算法 一、多星故障MHSS模型流程1、数据预处理2、构建假设模…...
部署 ollama + deepseek + open-webui 遇到的常见问题与解决建议
前言 前面部署了 ollama deepseek open-webui 这里聊聊部署过程中遇到的一些问题和解决方案。 包含 ollama 容器部署 和 本地部署 中所遇问题和解决方案。 1. ollama proxy 网络代理问题 ollama 容器部署 用不了 http https 的 proxy 代理(配全局都没用…...
sql难点
一、 假设你有一个查询,需要根据 id 是否为 null 来动态生成 SQL 条件: xml复制 <select id"getResources" resultType"Resource">SELECT * FROM resources<where><if test"id ! null">and id <!…...
oracle表分区--范围分区
文章目录 oracle表分区分区的原因分区的优势oracle表分区的作用oracle表分区类型一、范围分区二、 创建分区表和使用:1、按照数值范围划分2、按照时间范围3、MAXVALUE2. 向现有表添加新的分区3、 分区维护和重新组织(合并/删除) oracle表分区…...
mysql读写分离与proxysql的结合
上一篇文章介绍了mysql如何设置成主从复制模式,而主从复制的目的,是为了读写分离。 读写分离,拿spring boot项目来说,可以有2种方式: 1)设置2个数据源,读和写分开使用 2)使用中间件…...
elment-plus的表单的其中一项通过了验证再去走别的函数怎么写,不是全部内容通过验证
<template> <el-form ref"formRef" :model"formData" :rules"formRules"> <el-form-item label"身份证号" prop"idCard"> <el-input v-model"formData.idCard" blur"getDetail()"…...
蓝桥杯试题:归并排序
一、问题描述 在一个神秘的岛屿上,有一支探险队发现了一批宝藏,这批宝藏是以整数数组的形式存在的。每个宝藏上都标有一个数字,代表了其珍贵程度。然而,由于某种神奇的力量,这批宝藏的顺序被打乱了,探险队…...
Untiy3d 铰链、弹簧,特殊的物理关节
(一)铰链组件 1.创建一个立方体和角色胶囊 2.给角色胶囊挂在控制脚本和刚体 using System.Collections; using System.Collections.Generic; using UnityEngine;public class plyer : MonoBehaviour {// Start is called once before the first execut…...
Visual Studio 进行单元测试【入门】
摘要:在软件开发中,单元测试是一种重要的实践,通过验证代码的正确性,帮助开发者提高代码质量。本文将介绍如何在VisualStudio中进行单元测试,包括创建测试项目、编写测试代码、运行测试以及查看结果。 1. 什么是单元测…...
Leetcode - 周赛435
目录 一、3442. 奇偶频次间的最大差值 I二、3443. K 次修改后的最大曼哈顿距离三、3444. 使数组包含目标值倍数的最少增量四、3445. 奇偶频次间的最大差值 II 一、3442. 奇偶频次间的最大差值 I 题目链接 本题使用数组统计字符串 s s s 中每个字符的出现次数,然后…...
CentOS本机配置为时间源
CentOS本机配置为时间源 安装chrony,默认已安装修改配置文件 /etc/chrony.conf客户端配置 安装chrony,默认已安装 yum -y install chrony修改配置文件 /etc/chrony.conf # cat /etc/chrony.conf | grep -Ev "^$|#" server ceph00 iburst dri…...
算法之 数论
文章目录 质数判断质数3115.质数的最大距离 质数筛选204.计数质数2761.和等于目标值的质数对 2521.数组乘积中的不同质因数数目 质数 质数的定义:除了本身和1,不能被其他小于它的数整除,最小的质数是 2 求解质数的几种方法 法1,根…...
Android车机DIY开发之软件篇(十二) AOSP12下载编译
Android车机DIY开发之软件篇(十二) AOSP12下载编译 sudo apt-get update sudo apt-get install git-core gnupg flex bison gperf build-essential zip curl zlib1g-dev gcc-multilib gmultilib libc6-dev-i386 lib32ncurses5-dev libx11-dev lib32z-dev ccache libgl1-mesa-…...
docker 导出导入
1第一步骤docker save docker save -o database-export-4.1.0.tar database-export-4.1.0.jar:latest 2检查镜像ls -l, 注意:文件可能没有其他文件导出权限:chmod 644 database-export-4.1.0.tar 3在新的服务器导入: docker load -i databa…...
OSPF高级特性(3):安全特效
引言 OSPF的基础我们已经结束学习了,接下来我们继续学习OSPF的高级特性。为了方便大家阅读,我会将高级特性的几篇链接放在末尾,所有链接都是站内的,大家点击即可阅读: OSPF基础(1):工…...
基于SSM的农产品供销小程序+LW示例参考
1.项目介绍 系统角色:管理员、农户功能模块:用户管理、农户管理、产品分类管理、农产品管理、咨询管理、订单管理、收藏管理、购物车、充值、下单等技术选型:SSM,Vue(后端管理web),uniapp等测试…...
自然语言处理NLP入门 -- 第二节预处理文本数据
在自然语言处理(NLP)中,数据的质量直接影响模型的表现。文本预处理的目标是清理和标准化文本数据,使其适合机器学习或深度学习模型处理。本章介绍几种常见的文本预处理方法,并通过 Python 代码进行示例。 2.1 文本清理…...
android launcher拖动图标释放错位
由于为了设备流畅把所有动画效果设置为0.5,不设置为0是因为锁屏在开机时会有闪黑屏的现象。在此背景下,测试发现在拖动桌面图标时,在图标动画过程中错位时释放图标,则图标会留在错位的位置,不会自动对齐。 原因就是动…...
小结:OSPF的网络类型,LSA
OSPF(Open Shortest Path First)是一个基于链路状态的内部网关协议(IGP)。以下是对OSPF网络类型、LSA类型、序列号与Age作用,以及相关配置指令的详细讲解。 一、OSPF的网络类型 OSPF支持多种网络类型,不同…...
Unity URP的2D光照简介
官网工程,包括2d光照,动画,动效介绍: https://unity.com/cn/blog/games/happy-harvest-demo-latest-2d-techniques https://docs.unity3d.com/6000.0/Documentation/Manual/urp/Lights-2D-intro.html 人物脸部光照细节和脚上的阴影…...
