当前位置: 首页 > article >正文

在 Go 中实现事件溯源:构建高效且可扩展的系统

事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的步骤和工具来实现。本文将详细介绍如何在 Go 中实现事件溯源,包括定义事件和聚合根、事件存储、事件处理以及使用事件总线。此外,我们还会探讨一些最佳实践和实际案例,帮助你更好地理解和应用事件溯源。

1. 事件溯源与 CQRS

事件溯源通常与命令查询责任分离(Command Query Responsibility Segregation,CQRS)模式结合使用。CQRS 是一种设计模式,它将应用程序的读操作和写操作分离,从而提高系统的可扩展性和性能[7]。在 CQRS 中,聚合根(Aggregate Root)是核心实体,它封装了业务逻辑,并通过事件来记录状态变化[7]。

1.1 事件溯源的核心概念

事件溯源的核心是事件(Event),它表示系统中已经发生的一个不可变的事实。事件通常是不可变的,一旦生成就无法修改。事件溯源通过记录这些事件来重建系统的状态[5]。

1.2 CQRS 的核心概念

CQRS 将应用程序分为命令(Command)和查询(Query)两个部分。命令用于修改系统的状态,而查询用于读取系统的状态。这种分离使得系统可以更灵活地扩展[7]。

2. 定义事件和聚合根

2.1 事件

事件是事件溯源的核心,它表示系统中已经发生的一个不可变的事实。事件通常包含以下字段:

  • EventID:事件的唯一标识符。
  • EventType:事件的类型。
  • Data:事件的具体数据,通常以字节流的形式存储。
  • Timestamp:事件发生的时间戳。
  • AggregateType:聚合根的类型。
  • AggregateID:聚合根的唯一标识符。
  • Version:事件的版本号。
  • Metadata:事件的元数据,用于存储额外信息。

以下是一个简单的事件结构体定义:

type Event struct {EventID       stringEventType     stringData          []byteTimestamp     time.TimeAggregateType stringAggregateID   stringVersion       int64Metadata      []byte
}

2.2 聚合根

聚合根是事件溯源中的核心实体,它封装了业务逻辑,并通过事件来记录状态变化。聚合根通常包含以下字段:

  • ID:聚合根的唯一标识符。
  • Version:聚合根的版本号。
  • AppliedEvents:已经应用的事件列表。
  • UncommittedEvents:尚未提交的事件列表。
  • Type:聚合根的类型。
  • when:事件处理函数。

以下是一个聚合根的实现示例:

type AggregateBase struct {ID                stringVersion           int64AppliedEvents     []EventUncommittedEvents []EventType              stringwhen              func(Event) error
}func (a *AggregateBase) Apply(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if err := a.when(event); err != nil {return err}a.Version++event.Version = a.Versiona.UncommittedEvents = append(a.UncommittedEvents, event)return nil
}

3. 事件存储

事件存储是事件溯源的关键组件,用于持久化和检索事件。可以使用专门的事件存储数据库(如 EventStoreDB),也可以使用通用的数据库(如 PostgreSQL 或 MongoDB)[6]。

3.1 加载聚合根

加载聚合根时,从事件存储中读取所有相关事件,并通过 RaiseEvent 方法重建聚合根的状态:

func (a *AggregateBase) RaiseEvent(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if a.Version >= event.Version {return ErrInvalidEventVersion}if err := a.when(event); err != nil {return err}a.Version = event.Versionreturn nil
}

3.2 事件存储接口

事件存储接口定义了加载和保存聚合根的方法。以下是一个简单的事件存储接口定义:

type AggregateStore interface {Load(ctx context.Context, aggregate Aggregate) errorSave(ctx context.Context, aggregate Aggregate) errorExists(ctx context.Context, streamID string) error
}

3.3 实现事件存储

以下是一个基于 PostgreSQL 的事件存储实现示例:

func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())if err != nil && !errors.Is(err, pgx.ErrNoRows) {return tracing.TraceWithErr(span, err)}if snapshot != nil {if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))}err := p.loadAggregateEventsByVersion(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil}err = p.loadEvents(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil
}func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))if len(aggregate.GetChanges()) == 0 {p.log.Debug("(Save) aggregate.GetChanges()) == 0")span.LogFields(log.Int("events", len(aggregate.GetChanges())))return nil}tx, err := p.db.Begin(ctx)if err != nil {p.log.Errorf("(Save) db.Begin err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))}defer func() {if tx != nil {if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {err = txErrtracing.TraceErr(span, err)return}}}()changes := aggregate.GetChanges()events := make([]Event, 0, len(changes))for i := range changes {event, err := p.serializer.SerializeEvent(aggregate, changes[i])if err != nil {p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))}events = append(events, event)}if err := p.saveEventsTx(ctx, tx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))}if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {aggregate.ToSnapshot()if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))}}if err := p.processEvents(ctx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))}p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return tx.Commit(ctx)
}

4. 事件处理

事件处理逻辑可以通过事件处理器来实现。事件处理器监听事件并执行相应的业务逻辑[7]。

4.1 定义事件处理器

以下是一个事件处理器的示例:

type OrderEventHandler struct{}func (h *OrderEventHandler) Handle(event interface{}) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑// 处理其他事件}return nil
}

5. 使用事件溯源库

为了简化事件溯源的实现,可以使用一些现成的事件溯源库。例如,go.cqrs 是一个支持 CQRS 和事件溯源的框架[7]。

5.1

示例:处理命令和事件

type OrderAggregate struct {*cqrs.AggregateBasestatus string
}func (a *OrderAggregate) Handle(command interface{}) error {switch c := command.(type) {case PlaceOrderCommand:a.status = "Placed"a.apply(OrderPlacedEvent{OrderID: c.OrderID}) // 应用事件以反映新状态// 处理其他命令}return nil
}

6. 事件发布和订阅

事件可以通过事件总线发布,并由多个消费者订阅。

6.1 使用事件总线

以下是一个事件总线的示例:

dispatcher := goevents.NewEventDispatcher[*MyEvent]()// 添加订阅者
dispatcher.AddSubscriber(MySubscriber{})// 发布事件
event := NewMyEvent("user.created", "John Doe")
dispatcher.Dispatch(event)

7. 实际案例

7.1 微服务架构中的事件溯源

在微服务架构中,事件溯源可以用于实现服务之间的解耦和通信。以下是一个基于 Go 的微服务架构示例,展示如何使用事件溯源来实现订单处理系统。

7.1.1 订单服务

订单服务负责处理订单相关的业务逻辑,包括下单、支付和发货等操作。

type OrderService struct {eventStore AggregateStoreeventBus   EventBus
}func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {aggregate := NewOrderAggregate(order)err := s.eventStore.Load(ctx, aggregate)if err != nil {return err}err = aggregate.Handle(PlaceOrderCommand{OrderID: order.ID})if err != nil {return err}err = s.eventStore.Save(ctx, aggregate)if err != nil {return err}for _, event := range aggregate.GetChanges() {s.eventBus.Publish(event)}return nil
}
7.1.2 支付服务

支付服务负责处理支付相关的业务逻辑,包括支付成功和支付失败等操作。

type PaymentService struct {eventBus EventBus
}func (s *PaymentService) HandlePayment(ctx context.Context, payment Payment) error {err := s.eventBus.Subscribe(ctx, func(event Event) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑return nil// 处理其他事件}return nil})if err != nil {return err}return nil
}

8. 最佳实践

8.1 事件设计

  • 不可变性:事件一旦生成就不可修改。
  • 包含足够的信息:事件应该包含足够的信息,以便能够重建系统的状态。
  • 版本控制:事件应该包含版本号,以便能够处理并发问题。

8.2 聚合根设计

  • 封装业务逻辑:聚合根应该封装业务逻辑,并通过事件来记录状态变化。
  • 避免过多的事件:聚合根应该尽量减少事件的数量,以提高性能。

8.3 事件存储设计

  • 高性能:事件存储应该支持高性能的读写操作。
  • 可扩展性:事件存储应该支持水平扩展,以满足高并发的需求。

8.4 事件总线设计

  • 解耦:事件总线应该支持解耦,使得服务之间不需要直接通信。
  • 异步处理:事件总线应该支持异步处理,以提高系统的响应速度。

9. 总结

在 Go 中实现事件溯源需要定义事件和聚合根,使用事件存储来持久化事件,并通过事件处理器来处理事件。可以使用现成的事件溯源库(如 go.cqrs)来简化实现。事件总线可以用于发布和订阅事件,支持异步处理。事件溯源不仅能够提高系统的可扩展性和可维护性,还能为系统提供强大的可追溯性。

希望本文能帮助你更好地理解和实现事件溯源。如果你有任何问题或建议,欢迎在评论区留言。

相关文章:

在 Go 中实现事件溯源:构建高效且可扩展的系统

事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的…...

七、I2C通信读取LM75B温度

7.1 概述 I2C(Inter-Integrated Circuit)是一种同步、多主从、串行通信协议,由飞利浦公司开发,主要用于短距离通信,尤其在集成电路之间。 7.1.1 主要特点 两线制:仅需SDA(数据线)…...

Python 调用 Azure OpenAI API

在人工智能和机器学习快速发展的今天,Azure OpenAI 服务为开发者提供了强大的工具来集成先进的 AI 能力到他们的应用中。本文将指导您如何使用 Python 调用 Azure OpenAI API,特别是使用 GPT-4 模型进行对话生成。 准备工作 在开始之前,请确保您已经: 拥有一个 Azure 账户…...

Spring Boot 配置JPA数据库主从读写分离失败及解决办法

因为是老项目, Spring Boot 是1.4, 使用 AbstractRoutingDataSource 来做主从切换, 配置切面类在进入事务时切换成主库, 但实际运行起来却失败, 写操作路由到了从库 查了很多文章, 试了很多方法都无效, 包括修改注解 Transactional 的 propagation 属性, 清空主从标记等等 打…...

基于华为云镜像加速器的Docker环境搭建与项目部署指南

基于华为云镜像加速器的Docker环境搭建与项目部署指南 一、安装Docker1.1 更新系统包1.2 安装必要的依赖包1.3 移除原有的Docker仓库配置(如果存在)1.4 添加华为云Docker仓库1.5 安装Docker CE1.6 启动Docker服务1.7 验证Docker是否安装成功1.8 添加华为云镜像加速器地址二、…...

讲解下SpringBoot中MySql和MongoDB的配合使用

在Spring Boot中,MySQL和MongoDB可以配合使用,以充分发挥关系型数据库和非关系型数据库的优势。MySQL适合处理结构化数据,而MongoDB适合处理非结构化或半结构化数据。以下是如何在Spring Boot中同时使用MySQL和MongoDB的详细讲解。 1. 添加依…...

CSS 属性选择器详解与实战示例

CSS 属性选择器是 CSS 中非常强大且灵活的一类选择器,它能够根据 HTML 元素的属性和值来进行精准选中。在实际开发过程中,属性选择器不仅可以提高代码的可维护性,而且能够大大优化页面的样式控制。本文将结合菜鸟教程的示例,从基础…...

2025 游戏试玩打码平台PHP源码

源码介绍 2025 游戏试玩打码平台PHP源码 开发语言:PHP 数据库:MySQL 源码程序采用yii框架phpMysql语言开发 功能完善,无后门 程序功能有: 1.游戏试玩功能 2.广告体验功能 3.打码功能 4.新人任务 5.开启宝箱功能 6.站长联盟功能 7.兑换商城功…...

【Matlab算法】基于人工势场的多机器人协同运动与避障算法研究(附MATLAB完整代码)

📚基于人工势场的多机器人协同运动与避障算法研究 摘要1. 引言2. 方法说明2.1 人工势场模型2.2 运动控制流程3. 核心函数解释3.1 主循环结构3.2 力计算函数4. 实验设计4.1 参数配置4.2 测试场景5. 结果分析5.1 典型运动轨迹5.2 性能指标6. 总结与建议成果总结改进方向附录:完…...

自动化办公|xlwings 数据类型和转换

xlwings 数据类型和转换:Python 与 Excel 的桥梁 在使用 xlwings 进行 Python 和 Excel 数据交互时,理解两者之间的数据类型对应关系至关重要。本篇将详细介绍 Python 数据类型与 Excel 数据类型的对应关系,以及如何进行数据类型转换。 一、…...

北斗导航 | 基于多假设解分离(MHSS)模型的双星故障监测算法(MATLAB代码实现——ARAIM)

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 双星故障监测算法 一、多星故障MHSS模型流程1、数据预处理2、构建假设模…...

部署 ollama + deepseek + open-webui 遇到的常见问题与解决建议

前言 前面部署了 ollama deepseek open-webui 这里聊聊部署过程中遇到的一些问题和解决方案。 包含 ollama 容器部署 和 本地部署 中所遇问题和解决方案。 1. ollama proxy 网络代理问题 ollama 容器部署 用不了 http https 的 proxy 代理(配全局都没用&#xf…...

sql难点

一、 假设你有一个查询&#xff0c;需要根据 id 是否为 null 来动态生成 SQL 条件&#xff1a; xml复制 <select id"getResources" resultType"Resource">SELECT * FROM resources<where><if test"id ! null">and id <!…...

oracle表分区--范围分区

文章目录 oracle表分区分区的原因分区的优势oracle表分区的作用oracle表分区类型一、范围分区二、 创建分区表和使用&#xff1a;1、按照数值范围划分2、按照时间范围3、MAXVALUE2. 向现有表添加新的分区3、 分区维护和重新组织&#xff08;合并/删除&#xff09; oracle表分区…...

mysql读写分离与proxysql的结合

上一篇文章介绍了mysql如何设置成主从复制模式&#xff0c;而主从复制的目的&#xff0c;是为了读写分离。 读写分离&#xff0c;拿spring boot项目来说&#xff0c;可以有2种方式&#xff1a; 1&#xff09;设置2个数据源&#xff0c;读和写分开使用 2&#xff09;使用中间件…...

elment-plus的表单的其中一项通过了验证再去走别的函数怎么写,不是全部内容通过验证

<template> <el-form ref"formRef" :model"formData" :rules"formRules"> <el-form-item label"身份证号" prop"idCard"> <el-input v-model"formData.idCard" blur"getDetail()"…...

蓝桥杯试题:归并排序

一、问题描述 在一个神秘的岛屿上&#xff0c;有一支探险队发现了一批宝藏&#xff0c;这批宝藏是以整数数组的形式存在的。每个宝藏上都标有一个数字&#xff0c;代表了其珍贵程度。然而&#xff0c;由于某种神奇的力量&#xff0c;这批宝藏的顺序被打乱了&#xff0c;探险队…...

Untiy3d 铰链、弹簧,特殊的物理关节

&#xff08;一&#xff09;铰链组件 1.创建一个立方体和角色胶囊 2.给角色胶囊挂在控制脚本和刚体 using System.Collections; using System.Collections.Generic; using UnityEngine;public class plyer : MonoBehaviour {// Start is called once before the first execut…...

Visual Studio 进行单元测试【入门】

摘要&#xff1a;在软件开发中&#xff0c;单元测试是一种重要的实践&#xff0c;通过验证代码的正确性&#xff0c;帮助开发者提高代码质量。本文将介绍如何在VisualStudio中进行单元测试&#xff0c;包括创建测试项目、编写测试代码、运行测试以及查看结果。 1. 什么是单元测…...

Leetcode - 周赛435

目录 一、3442. 奇偶频次间的最大差值 I二、3443. K 次修改后的最大曼哈顿距离三、3444. 使数组包含目标值倍数的最少增量四、3445. 奇偶频次间的最大差值 II 一、3442. 奇偶频次间的最大差值 I 题目链接 本题使用数组统计字符串 s s s 中每个字符的出现次数&#xff0c;然后…...

CentOS本机配置为时间源

CentOS本机配置为时间源 安装chrony&#xff0c;默认已安装修改配置文件 /etc/chrony.conf客户端配置 安装chrony&#xff0c;默认已安装 yum -y install chrony修改配置文件 /etc/chrony.conf # cat /etc/chrony.conf | grep -Ev "^$|#" server ceph00 iburst dri…...

算法之 数论

文章目录 质数判断质数3115.质数的最大距离 质数筛选204.计数质数2761.和等于目标值的质数对 2521.数组乘积中的不同质因数数目 质数 质数的定义&#xff1a;除了本身和1&#xff0c;不能被其他小于它的数整除&#xff0c;最小的质数是 2 求解质数的几种方法 法1&#xff0c;根…...

Android车机DIY开发之软件篇(十二) AOSP12下载编译

Android车机DIY开发之软件篇(十二) AOSP12下载编译 sudo apt-get update sudo apt-get install git-core gnupg flex bison gperf build-essential zip curl zlib1g-dev gcc-multilib gmultilib libc6-dev-i386 lib32ncurses5-dev libx11-dev lib32z-dev ccache libgl1-mesa-…...

docker 导出导入

1第一步骤docker save docker save -o database-export-4.1.0.tar database-export-4.1.0.jar:latest 2检查镜像ls -l, 注意&#xff1a;文件可能没有其他文件导出权限&#xff1a;chmod 644 database-export-4.1.0.tar 3在新的服务器导入&#xff1a; docker load -i databa…...

OSPF高级特性(3):安全特效

引言 OSPF的基础我们已经结束学习了&#xff0c;接下来我们继续学习OSPF的高级特性。为了方便大家阅读&#xff0c;我会将高级特性的几篇链接放在末尾&#xff0c;所有链接都是站内的&#xff0c;大家点击即可阅读&#xff1a; OSPF基础&#xff08;1&#xff09;&#xff1a;工…...

基于SSM的农产品供销小程序+LW示例参考

1.项目介绍 系统角色&#xff1a;管理员、农户功能模块&#xff1a;用户管理、农户管理、产品分类管理、农产品管理、咨询管理、订单管理、收藏管理、购物车、充值、下单等技术选型&#xff1a;SSM&#xff0c;Vue&#xff08;后端管理web&#xff09;&#xff0c;uniapp等测试…...

自然语言处理NLP入门 -- 第二节预处理文本数据

在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;数据的质量直接影响模型的表现。文本预处理的目标是清理和标准化文本数据&#xff0c;使其适合机器学习或深度学习模型处理。本章介绍几种常见的文本预处理方法&#xff0c;并通过 Python 代码进行示例。 2.1 文本清理…...

android launcher拖动图标释放错位

由于为了设备流畅把所有动画效果设置为0.5&#xff0c;不设置为0是因为锁屏在开机时会有闪黑屏的现象。在此背景下&#xff0c;测试发现在拖动桌面图标时&#xff0c;在图标动画过程中错位时释放图标&#xff0c;则图标会留在错位的位置&#xff0c;不会自动对齐。 原因就是动…...

小结:OSPF的网络类型,LSA

OSPF&#xff08;Open Shortest Path First&#xff09;是一个基于链路状态的内部网关协议&#xff08;IGP&#xff09;。以下是对OSPF网络类型、LSA类型、序列号与Age作用&#xff0c;以及相关配置指令的详细讲解。 一、OSPF的网络类型 OSPF支持多种网络类型&#xff0c;不同…...

Unity URP的2D光照简介

官网工程&#xff0c;包括2d光照&#xff0c;动画&#xff0c;动效介绍&#xff1a; https://unity.com/cn/blog/games/happy-harvest-demo-latest-2d-techniques https://docs.unity3d.com/6000.0/Documentation/Manual/urp/Lights-2D-intro.html 人物脸部光照细节和脚上的阴影…...