当前位置: 首页 > news >正文

在 Go 中实现事件溯源:构建高效且可扩展的系统

事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的步骤和工具来实现。本文将详细介绍如何在 Go 中实现事件溯源,包括定义事件和聚合根、事件存储、事件处理以及使用事件总线。此外,我们还会探讨一些最佳实践和实际案例,帮助你更好地理解和应用事件溯源。

1. 事件溯源与 CQRS

事件溯源通常与命令查询责任分离(Command Query Responsibility Segregation,CQRS)模式结合使用。CQRS 是一种设计模式,它将应用程序的读操作和写操作分离,从而提高系统的可扩展性和性能[7]。在 CQRS 中,聚合根(Aggregate Root)是核心实体,它封装了业务逻辑,并通过事件来记录状态变化[7]。

1.1 事件溯源的核心概念

事件溯源的核心是事件(Event),它表示系统中已经发生的一个不可变的事实。事件通常是不可变的,一旦生成就无法修改。事件溯源通过记录这些事件来重建系统的状态[5]。

1.2 CQRS 的核心概念

CQRS 将应用程序分为命令(Command)和查询(Query)两个部分。命令用于修改系统的状态,而查询用于读取系统的状态。这种分离使得系统可以更灵活地扩展[7]。

2. 定义事件和聚合根

2.1 事件

事件是事件溯源的核心,它表示系统中已经发生的一个不可变的事实。事件通常包含以下字段:

  • EventID:事件的唯一标识符。
  • EventType:事件的类型。
  • Data:事件的具体数据,通常以字节流的形式存储。
  • Timestamp:事件发生的时间戳。
  • AggregateType:聚合根的类型。
  • AggregateID:聚合根的唯一标识符。
  • Version:事件的版本号。
  • Metadata:事件的元数据,用于存储额外信息。

以下是一个简单的事件结构体定义:

type Event struct {EventID       stringEventType     stringData          []byteTimestamp     time.TimeAggregateType stringAggregateID   stringVersion       int64Metadata      []byte
}

2.2 聚合根

聚合根是事件溯源中的核心实体,它封装了业务逻辑,并通过事件来记录状态变化。聚合根通常包含以下字段:

  • ID:聚合根的唯一标识符。
  • Version:聚合根的版本号。
  • AppliedEvents:已经应用的事件列表。
  • UncommittedEvents:尚未提交的事件列表。
  • Type:聚合根的类型。
  • when:事件处理函数。

以下是一个聚合根的实现示例:

type AggregateBase struct {ID                stringVersion           int64AppliedEvents     []EventUncommittedEvents []EventType              stringwhen              func(Event) error
}func (a *AggregateBase) Apply(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if err := a.when(event); err != nil {return err}a.Version++event.Version = a.Versiona.UncommittedEvents = append(a.UncommittedEvents, event)return nil
}

3. 事件存储

事件存储是事件溯源的关键组件,用于持久化和检索事件。可以使用专门的事件存储数据库(如 EventStoreDB),也可以使用通用的数据库(如 PostgreSQL 或 MongoDB)[6]。

3.1 加载聚合根

加载聚合根时,从事件存储中读取所有相关事件,并通过 RaiseEvent 方法重建聚合根的状态:

func (a *AggregateBase) RaiseEvent(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if a.Version >= event.Version {return ErrInvalidEventVersion}if err := a.when(event); err != nil {return err}a.Version = event.Versionreturn nil
}

3.2 事件存储接口

事件存储接口定义了加载和保存聚合根的方法。以下是一个简单的事件存储接口定义:

type AggregateStore interface {Load(ctx context.Context, aggregate Aggregate) errorSave(ctx context.Context, aggregate Aggregate) errorExists(ctx context.Context, streamID string) error
}

3.3 实现事件存储

以下是一个基于 PostgreSQL 的事件存储实现示例:

func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())if err != nil && !errors.Is(err, pgx.ErrNoRows) {return tracing.TraceWithErr(span, err)}if snapshot != nil {if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))}err := p.loadAggregateEventsByVersion(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil}err = p.loadEvents(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil
}func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))if len(aggregate.GetChanges()) == 0 {p.log.Debug("(Save) aggregate.GetChanges()) == 0")span.LogFields(log.Int("events", len(aggregate.GetChanges())))return nil}tx, err := p.db.Begin(ctx)if err != nil {p.log.Errorf("(Save) db.Begin err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))}defer func() {if tx != nil {if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {err = txErrtracing.TraceErr(span, err)return}}}()changes := aggregate.GetChanges()events := make([]Event, 0, len(changes))for i := range changes {event, err := p.serializer.SerializeEvent(aggregate, changes[i])if err != nil {p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))}events = append(events, event)}if err := p.saveEventsTx(ctx, tx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))}if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {aggregate.ToSnapshot()if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))}}if err := p.processEvents(ctx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))}p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return tx.Commit(ctx)
}

4. 事件处理

事件处理逻辑可以通过事件处理器来实现。事件处理器监听事件并执行相应的业务逻辑[7]。

4.1 定义事件处理器

以下是一个事件处理器的示例:

type OrderEventHandler struct{}func (h *OrderEventHandler) Handle(event interface{}) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑// 处理其他事件}return nil
}

5. 使用事件溯源库

为了简化事件溯源的实现,可以使用一些现成的事件溯源库。例如,go.cqrs 是一个支持 CQRS 和事件溯源的框架[7]。

5.1

示例:处理命令和事件

type OrderAggregate struct {*cqrs.AggregateBasestatus string
}func (a *OrderAggregate) Handle(command interface{}) error {switch c := command.(type) {case PlaceOrderCommand:a.status = "Placed"a.apply(OrderPlacedEvent{OrderID: c.OrderID}) // 应用事件以反映新状态// 处理其他命令}return nil
}

6. 事件发布和订阅

事件可以通过事件总线发布,并由多个消费者订阅。

6.1 使用事件总线

以下是一个事件总线的示例:

dispatcher := goevents.NewEventDispatcher[*MyEvent]()// 添加订阅者
dispatcher.AddSubscriber(MySubscriber{})// 发布事件
event := NewMyEvent("user.created", "John Doe")
dispatcher.Dispatch(event)

7. 实际案例

7.1 微服务架构中的事件溯源

在微服务架构中,事件溯源可以用于实现服务之间的解耦和通信。以下是一个基于 Go 的微服务架构示例,展示如何使用事件溯源来实现订单处理系统。

7.1.1 订单服务

订单服务负责处理订单相关的业务逻辑,包括下单、支付和发货等操作。

type OrderService struct {eventStore AggregateStoreeventBus   EventBus
}func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {aggregate := NewOrderAggregate(order)err := s.eventStore.Load(ctx, aggregate)if err != nil {return err}err = aggregate.Handle(PlaceOrderCommand{OrderID: order.ID})if err != nil {return err}err = s.eventStore.Save(ctx, aggregate)if err != nil {return err}for _, event := range aggregate.GetChanges() {s.eventBus.Publish(event)}return nil
}
7.1.2 支付服务

支付服务负责处理支付相关的业务逻辑,包括支付成功和支付失败等操作。

type PaymentService struct {eventBus EventBus
}func (s *PaymentService) HandlePayment(ctx context.Context, payment Payment) error {err := s.eventBus.Subscribe(ctx, func(event Event) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑return nil// 处理其他事件}return nil})if err != nil {return err}return nil
}

8. 最佳实践

8.1 事件设计

  • 不可变性:事件一旦生成就不可修改。
  • 包含足够的信息:事件应该包含足够的信息,以便能够重建系统的状态。
  • 版本控制:事件应该包含版本号,以便能够处理并发问题。

8.2 聚合根设计

  • 封装业务逻辑:聚合根应该封装业务逻辑,并通过事件来记录状态变化。
  • 避免过多的事件:聚合根应该尽量减少事件的数量,以提高性能。

8.3 事件存储设计

  • 高性能:事件存储应该支持高性能的读写操作。
  • 可扩展性:事件存储应该支持水平扩展,以满足高并发的需求。

8.4 事件总线设计

  • 解耦:事件总线应该支持解耦,使得服务之间不需要直接通信。
  • 异步处理:事件总线应该支持异步处理,以提高系统的响应速度。

9. 总结

在 Go 中实现事件溯源需要定义事件和聚合根,使用事件存储来持久化事件,并通过事件处理器来处理事件。可以使用现成的事件溯源库(如 go.cqrs)来简化实现。事件总线可以用于发布和订阅事件,支持异步处理。事件溯源不仅能够提高系统的可扩展性和可维护性,还能为系统提供强大的可追溯性。

希望本文能帮助你更好地理解和实现事件溯源。如果你有任何问题或建议,欢迎在评论区留言。

相关文章:

在 Go 中实现事件溯源:构建高效且可扩展的系统

事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的…...

加解密 | AES加、解密学习

加解密 | AES加、解密学习 你的代码实现了一个简单的AES(高级加密标准)加密和解密的测试程序。以下是对代码的分析和一些改进建议: 代码功能 初始化数据和密钥: 定义了一个16字节的输入数据 input_data。定义了一个16字节的AES…...

【学术投稿-2025年计算机视觉研究进展与应用国际学术会议 (ACVRA 2025)】CSS样式解析:行内、内部与外部样式的区别与优先级分析

简介 2025年计算机视觉研究进展与应用(ACVRA 2025)将于2025年2月28-3月2日在中国广州召开,会议将汇聚世界各地的顶尖学者、研究人员和行业专家,聚焦计算机视觉领域的最新研究动态与应用成就。本次会议将探讨前沿技术,…...

MongoDB 基本操作

一、数据库操作 1. 切换或创建数据库 使用use命令切换到指定数据库,若该数据库不存在,在首次插入数据时会自动创建。 use myDatabase 2. 查看所有数据库 使用show dbs命令查看 MongoDB 实例中的所有数据库。 show dbs 3. 删除当前数据库 使用db.…...

Eclipse JSP/Servlet 深入解析

Eclipse JSP/Servlet 深入解析 引言 随着互联网的快速发展,Java Web开发技术逐渐成为企业级应用开发的主流。在Java Web开发中,JSP(JavaServer Pages)和Servlet是两个核心组件,它们共同构成了Java Web应用程序的基础。本文将深入解析Eclipse平台下的JSP/Servlet技术,帮…...

Hyperledger caliper 性能测试

前言:Hyperledger caliper 的本质是使用node对被测试网络进行压力测试,因此需要nodejs。本次使用 Hyperledger caliper 0.5 对 fabric 1.4.6进行压测 准备条件:nodejs 16 (略 linux下 解压环境变量即可) # 创建工作…...

Record-Mode 备案免关站插件,让 WordPress 备案不影响 SEO 和收录

专为 WordPress 网站设计的实用工具,旨在帮助网站在备案期间无需关闭即可正常收录所有页面的信息,利于SEO。 功能特性 免关站展示:开启插件后,非管理员用户访问网站时,会看到以半透明遮罩层或不透明全屏遮罩样式呈现的…...

【Java 面试 八股文】Redis篇

Redis 1. 什么是缓存穿透?怎么解决?2. 你能介绍一下布隆过滤器吗?3. 什么是缓存击穿?怎么解决?4. 什么是缓存雪崩?怎么解决?5. redis做为缓存,mysql的数据如何与redis进行同步呢&…...

介绍几款免费的显示器辅助工具!

今天为大家介绍几款实用的显示器辅助软件,它们可以帮助你轻松切换显示源调节、显示器亮度,甚至优化显示效果,让你的屏幕使用体验更加便捷和舒适。 Monitor Brightness Adjuster-多屏幕亮度调节工具 如果你需要同时使用多个显示器&#xff0…...

django配置跨域

1、第一种 from django.views.decorators.csrf import csrf_exemptcsrf_exempt第二种 安装 pip install django-cors-headers在配置文件settings.py进入 INSTALLED_APPS [..."corsheaders", # 添加 ]MIDDLEWARE [corsheaders.middleware.CorsMiddleware, # 添加…...

web前端第三次作业

题目 本期作业 WEB第三次作业 请使用JS实一个网页中登录窗口的显示/隐藏&#xff0c;页面中拖动移动&#xff0c;并且添加了边界判断的网页效 代码图片 效果展示 代码 <!DOCTYPE html> <html lang"zh"> <head> <meta charset"UTF-8&qu…...

【Pandas】pandas Series align

Pandas2.2 Series Computations descriptive stats 方法描述Series.align(other[, join, axis, level, …])用于将两个 Series 对齐&#xff0c;使其具有相同的索引 pandas.Series.align pandas.Series.align() 方法用于将两个 Series 对齐&#xff0c;使其具有相同的索引。…...

DeepSeek-V3网络模型架构图解

DeepSeek-V3网络架构的创新主要在两次&#xff0c;分别是在前馈层的MOE&#xff08;混合专家模型&#xff09;和在注意力中的MHA&#xff08;多头潜在注意力&#xff0c;一种注意力计算规模压缩技术&#xff09;。 MOE&#xff08;混合专家模型&#xff09; 回顾最初的MOE GS…...

Linux系统管理小课堂

1. 文件系统&#xff1a;你的数字房间大扫除 例子1&#xff1a;藏日记本的保险箱 chmod 700 my_diary.txt &#x1f47b; 解释&#xff1a;把日记文件权限改成「只有主人能读写」&#xff0c;室友偷看时系统会翻白眼&#xff1a;“Permission denied&#xff01;” 例子2&…...

明远智睿核心板在智能家居与工业网关中的应用实践

**——从硬件支持到场景落地的技术路径** SSD2351 在智能家居与工业物联网领域&#xff0c;设备需具备实时响应、多协议兼容及边缘计算能力。明远智睿新款核心板凭借其硬件特性&#xff0c;可高效支撑以下典型场景&#xff1a; #### **场景一&#xff1a;智能家居中枢网关**…...

Windows 系统 GDAL库 配置到 Qt 上

在地理信息开发中广泛使用的开源库&#xff0c;GDAL(Geospatial Data Abstraction Library&#xff09;)库提供了读取和处理各种地理空间数据格式的能力。 准备阶段 下载 GDAL 库&#xff1a;前往 GDAL 的官方网站&#xff08;https://www.gisinternals.com/&#xff09;下载…...

部署onlyoffice后,php版的callback及小魔改(logo和关于)

作为这篇博文的补充CentOS9 安装Docker+Dpanel+onlyoffice(https、更改字体、字号、去除限制)的避坑笔记,现在继续… 本次主要内容有:php中callback的调用、自签证书调用callback遇到SSL certificate problem: unable to get local issuer certificate问题、修改onlyoffic…...

《qt open3d网格拉普拉斯平滑》

qt open3d网格拉普拉斯平滑 效果展示二、流程三、代码效果展示 二、流程 创建动作,链接到槽函数,并把动作放置菜单栏 参照前文 三、代码 1、槽函数实现 void on_actionFilterLaplacian_triggered();void MainWindow::on_actionFil...

【愚公系列】《Python网络爬虫从入门到精通》004-请求模块urllib3

标题详情作者简介愚公搬代码头衔华为云特约编辑&#xff0c;华为云云享专家&#xff0c;华为开发者专家&#xff0c;华为产品云测专家&#xff0c;CSDN博客专家&#xff0c;CSDN商业化专家&#xff0c;阿里云专家博主&#xff0c;阿里云签约作者&#xff0c;腾讯云优秀博主&…...

网络安全技术复习总结

1|0第一章 概论 1.网络安全发展阶段包括四个阶段&#xff1a;通信安全、计算机安全、网络安全、网络空间安全。 2.2017年6月1日&#xff0c;我国第一部全面规范网络空间安全的基础性法律《中华人民共和国网络安全法》正式实施。 3.2021年 6月10日&#xff0c;《中华人民共和…...

应用分享 | 精准生成和时序控制!AWG在确定性三量子比特纠缠光子源中的应用

在量子技术飞速发展的今天&#xff0c;实现高效稳定的量子态操控是推动量子计算、量子通信等领域迈向实用化的关键。任意波形发生器&#xff08;AWG&#xff09;作为精准信号控制的核心设备&#xff0c;在量子实验中发挥着不可或缺的作用。丹麦哥本哈根大学的研究团队基于单个量…...

关于物联网的基础知识(一)

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///计算机爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于物联网的基础知识&#xff08;一&a…...

【C++快读快写】

算法竞赛中用于解决卡常问题 int rd(){int k 0;char c getchar();while(!isdigit(c)){c getchar();}while(isdigit(c)){k (k << 1) (k << 3) (c^0), c getchar();}return k; }void wr(int x) {if (x > 9)wr(x / 10);putchar((x % 10) ^ 0); }用法&#x…...

数学复习笔记 27

前言 太难受了。因为一些事情。和朋友倾诉了一下&#xff0c;也没啥用&#xff0c;几年之后不知道自己再想到的时候&#xff0c;会怎么考虑呢。另外&#xff0c;笔记还是有框架一点比较好&#xff0c;这样比较有逻辑感受。不然太乱了。这篇笔记是关于线代第五章&#xff0c;特…...

灰狼优化算法MATLAB实现,包含种群初始化和29种基准函数测试

灰狼优化算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;MATLAB实现&#xff0c;包含种群初始化和29种基准函数测试。代码包含详细注释和可视化模块&#xff1a; %% 灰狼优化算法主程序 (GWO.m) function GWO()clear; clc; close all;% 参数设置SearchAgents_no 30; …...

关于单片机的基础知识(一)

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///计算机爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于单片机基础知识的相关内容&#xf…...

微软推出SQL Server 2025技术预览版,深化人工智能应用集成

在Build 2025 大会上&#xff0c;微软向开发者社区开放了SQL Server 2025的测试版本。该版本的技术改进主要涵盖人工智能功能集成、系统性能优化与开发工具链升级三个维度&#xff0c;展示了数据库管理系统在智能化演进方向上的重要进展。 智能数据处理功能更新 新版本的技术亮…...

如何从浏览器中导出网站证书

以导出 GitHub 证书为例&#xff0c;点击 小锁 点击 导出 注意&#xff1a;这里需要根据你想要证书格式手动加上后缀名&#xff0c;我的是加 .crt 双击文件打开...

grafana-mcp-analyzer:基于 MCP 的轻量 AI 分析监控图表的运维神器!

还在深夜盯着 Grafana 图表手动排查问题&#xff1f;今天推荐一个让 AI 能“读图说话”的开源神器 —— grafana-mcp-analyzer。 想象一下这样的场景&#xff1a; 凌晨3点&#xff0c;服务器告警响起。。。你睁着惺忪的眼睛盯着复杂的监控图表 &#x1f635;‍&#x1f4ab;花…...

算法练习-回溯

今天开始新的章节&#xff0c;关于算法中回溯法的练习&#xff0c;这部分题目的难度还是比较大的&#xff0c;但是十分锻炼人的思维与思考能力。 处理这类题目首先要注意几个基本点&#xff1a; 1.关于递归出口的设置&#xff0c;这是十分关键的&#xff0c;要避免死循环的产…...