在 Go 中实现事件溯源:构建高效且可扩展的系统
事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的步骤和工具来实现。本文将详细介绍如何在 Go 中实现事件溯源,包括定义事件和聚合根、事件存储、事件处理以及使用事件总线。此外,我们还会探讨一些最佳实践和实际案例,帮助你更好地理解和应用事件溯源。
1. 事件溯源与 CQRS
事件溯源通常与命令查询责任分离(Command Query Responsibility Segregation,CQRS)模式结合使用。CQRS 是一种设计模式,它将应用程序的读操作和写操作分离,从而提高系统的可扩展性和性能[7]。在 CQRS 中,聚合根(Aggregate Root)是核心实体,它封装了业务逻辑,并通过事件来记录状态变化[7]。
1.1 事件溯源的核心概念
事件溯源的核心是事件(Event),它表示系统中已经发生的一个不可变的事实。事件通常是不可变的,一旦生成就无法修改。事件溯源通过记录这些事件来重建系统的状态[5]。
1.2 CQRS 的核心概念
CQRS 将应用程序分为命令(Command)和查询(Query)两个部分。命令用于修改系统的状态,而查询用于读取系统的状态。这种分离使得系统可以更灵活地扩展[7]。
2. 定义事件和聚合根
2.1 事件
事件是事件溯源的核心,它表示系统中已经发生的一个不可变的事实。事件通常包含以下字段:
- EventID:事件的唯一标识符。
- EventType:事件的类型。
- Data:事件的具体数据,通常以字节流的形式存储。
- Timestamp:事件发生的时间戳。
- AggregateType:聚合根的类型。
- AggregateID:聚合根的唯一标识符。
- Version:事件的版本号。
- Metadata:事件的元数据,用于存储额外信息。
以下是一个简单的事件结构体定义:
type Event struct {EventID stringEventType stringData []byteTimestamp time.TimeAggregateType stringAggregateID stringVersion int64Metadata []byte
}
2.2 聚合根
聚合根是事件溯源中的核心实体,它封装了业务逻辑,并通过事件来记录状态变化。聚合根通常包含以下字段:
- ID:聚合根的唯一标识符。
- Version:聚合根的版本号。
- AppliedEvents:已经应用的事件列表。
- UncommittedEvents:尚未提交的事件列表。
- Type:聚合根的类型。
- when:事件处理函数。
以下是一个聚合根的实现示例:
type AggregateBase struct {ID stringVersion int64AppliedEvents []EventUncommittedEvents []EventType stringwhen func(Event) error
}func (a *AggregateBase) Apply(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if err := a.when(event); err != nil {return err}a.Version++event.Version = a.Versiona.UncommittedEvents = append(a.UncommittedEvents, event)return nil
}
3. 事件存储
事件存储是事件溯源的关键组件,用于持久化和检索事件。可以使用专门的事件存储数据库(如 EventStoreDB),也可以使用通用的数据库(如 PostgreSQL 或 MongoDB)[6]。
3.1 加载聚合根
加载聚合根时,从事件存储中读取所有相关事件,并通过 RaiseEvent
方法重建聚合根的状态:
func (a *AggregateBase) RaiseEvent(event Event) error {if event.AggregateID != a.ID {return ErrInvalidAggregateID}if a.Version >= event.Version {return ErrInvalidEventVersion}if err := a.when(event); err != nil {return err}a.Version = event.Versionreturn nil
}
3.2 事件存储接口
事件存储接口定义了加载和保存聚合根的方法。以下是一个简单的事件存储接口定义:
type AggregateStore interface {Load(ctx context.Context, aggregate Aggregate) errorSave(ctx context.Context, aggregate Aggregate) errorExists(ctx context.Context, streamID string) error
}
3.3 实现事件存储
以下是一个基于 PostgreSQL 的事件存储实现示例:
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())if err != nil && !errors.Is(err, pgx.ErrNoRows) {return tracing.TraceWithErr(span, err)}if snapshot != nil {if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))}err := p.loadAggregateEventsByVersion(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil}err = p.loadEvents(ctx, aggregate)if err != nil {return err}p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return nil
}func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")defer span.Finish()span.LogFields(log.String("aggregate", aggregate.String()))if len(aggregate.GetChanges()) == 0 {p.log.Debug("(Save) aggregate.GetChanges()) == 0")span.LogFields(log.Int("events", len(aggregate.GetChanges())))return nil}tx, err := p.db.Begin(ctx)if err != nil {p.log.Errorf("(Save) db.Begin err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))}defer func() {if tx != nil {if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {err = txErrtracing.TraceErr(span, err)return}}}()changes := aggregate.GetChanges()events := make([]Event, 0, len(changes))for i := range changes {event, err := p.serializer.SerializeEvent(aggregate, changes[i])if err != nil {p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))}events = append(events, event)}if err := p.saveEventsTx(ctx, tx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))}if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {aggregate.ToSnapshot()if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))}}if err := p.processEvents(ctx, events); err != nil {return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))}p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())span.LogFields(log.String("aggregate with events", aggregate.String()))return tx.Commit(ctx)
}
4. 事件处理
事件处理逻辑可以通过事件处理器来实现。事件处理器监听事件并执行相应的业务逻辑[7]。
4.1 定义事件处理器
以下是一个事件处理器的示例:
type OrderEventHandler struct{}func (h *OrderEventHandler) Handle(event interface{}) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑// 处理其他事件}return nil
}
5. 使用事件溯源库
为了简化事件溯源的实现,可以使用一些现成的事件溯源库。例如,go.cqrs
是一个支持 CQRS 和事件溯源的框架[7]。
5.1
示例:处理命令和事件
type OrderAggregate struct {*cqrs.AggregateBasestatus string
}func (a *OrderAggregate) Handle(command interface{}) error {switch c := command.(type) {case PlaceOrderCommand:a.status = "Placed"a.apply(OrderPlacedEvent{OrderID: c.OrderID}) // 应用事件以反映新状态// 处理其他命令}return nil
}
6. 事件发布和订阅
事件可以通过事件总线发布,并由多个消费者订阅。
6.1 使用事件总线
以下是一个事件总线的示例:
dispatcher := goevents.NewEventDispatcher[*MyEvent]()// 添加订阅者
dispatcher.AddSubscriber(MySubscriber{})// 发布事件
event := NewMyEvent("user.created", "John Doe")
dispatcher.Dispatch(event)
7. 实际案例
7.1 微服务架构中的事件溯源
在微服务架构中,事件溯源可以用于实现服务之间的解耦和通信。以下是一个基于 Go 的微服务架构示例,展示如何使用事件溯源来实现订单处理系统。
7.1.1 订单服务
订单服务负责处理订单相关的业务逻辑,包括下单、支付和发货等操作。
type OrderService struct {eventStore AggregateStoreeventBus EventBus
}func (s *OrderService) PlaceOrder(ctx context.Context, order Order) error {aggregate := NewOrderAggregate(order)err := s.eventStore.Load(ctx, aggregate)if err != nil {return err}err = aggregate.Handle(PlaceOrderCommand{OrderID: order.ID})if err != nil {return err}err = s.eventStore.Save(ctx, aggregate)if err != nil {return err}for _, event := range aggregate.GetChanges() {s.eventBus.Publish(event)}return nil
}
7.1.2 支付服务
支付服务负责处理支付相关的业务逻辑,包括支付成功和支付失败等操作。
type PaymentService struct {eventBus EventBus
}func (s *PaymentService) HandlePayment(ctx context.Context, payment Payment) error {err := s.eventBus.Subscribe(ctx, func(event Event) error {switch e := event.(type) {case OrderPlacedEvent:// 处理订单已下单的逻辑return nil// 处理其他事件}return nil})if err != nil {return err}return nil
}
8. 最佳实践
8.1 事件设计
- 不可变性:事件一旦生成就不可修改。
- 包含足够的信息:事件应该包含足够的信息,以便能够重建系统的状态。
- 版本控制:事件应该包含版本号,以便能够处理并发问题。
8.2 聚合根设计
- 封装业务逻辑:聚合根应该封装业务逻辑,并通过事件来记录状态变化。
- 避免过多的事件:聚合根应该尽量减少事件的数量,以提高性能。
8.3 事件存储设计
- 高性能:事件存储应该支持高性能的读写操作。
- 可扩展性:事件存储应该支持水平扩展,以满足高并发的需求。
8.4 事件总线设计
- 解耦:事件总线应该支持解耦,使得服务之间不需要直接通信。
- 异步处理:事件总线应该支持异步处理,以提高系统的响应速度。
9. 总结
在 Go 中实现事件溯源需要定义事件和聚合根,使用事件存储来持久化事件,并通过事件处理器来处理事件。可以使用现成的事件溯源库(如 go.cqrs
)来简化实现。事件总线可以用于发布和订阅事件,支持异步处理。事件溯源不仅能够提高系统的可扩展性和可维护性,还能为系统提供强大的可追溯性。
希望本文能帮助你更好地理解和实现事件溯源。如果你有任何问题或建议,欢迎在评论区留言。
相关文章:
在 Go 中实现事件溯源:构建高效且可扩展的系统
事件溯源(Event Sourcing)是一种强大的架构模式,它通过记录系统状态的变化(事件)来重建系统的历史状态。这种模式特别适合需要高可扩展性、可追溯性和解耦的系统。在 Go 语言中,事件溯源可以通过一些简单的…...
加解密 | AES加、解密学习
加解密 | AES加、解密学习 你的代码实现了一个简单的AES(高级加密标准)加密和解密的测试程序。以下是对代码的分析和一些改进建议: 代码功能 初始化数据和密钥: 定义了一个16字节的输入数据 input_data。定义了一个16字节的AES…...

【学术投稿-2025年计算机视觉研究进展与应用国际学术会议 (ACVRA 2025)】CSS样式解析:行内、内部与外部样式的区别与优先级分析
简介 2025年计算机视觉研究进展与应用(ACVRA 2025)将于2025年2月28-3月2日在中国广州召开,会议将汇聚世界各地的顶尖学者、研究人员和行业专家,聚焦计算机视觉领域的最新研究动态与应用成就。本次会议将探讨前沿技术,…...
MongoDB 基本操作
一、数据库操作 1. 切换或创建数据库 使用use命令切换到指定数据库,若该数据库不存在,在首次插入数据时会自动创建。 use myDatabase 2. 查看所有数据库 使用show dbs命令查看 MongoDB 实例中的所有数据库。 show dbs 3. 删除当前数据库 使用db.…...
Eclipse JSP/Servlet 深入解析
Eclipse JSP/Servlet 深入解析 引言 随着互联网的快速发展,Java Web开发技术逐渐成为企业级应用开发的主流。在Java Web开发中,JSP(JavaServer Pages)和Servlet是两个核心组件,它们共同构成了Java Web应用程序的基础。本文将深入解析Eclipse平台下的JSP/Servlet技术,帮…...

Hyperledger caliper 性能测试
前言:Hyperledger caliper 的本质是使用node对被测试网络进行压力测试,因此需要nodejs。本次使用 Hyperledger caliper 0.5 对 fabric 1.4.6进行压测 准备条件:nodejs 16 (略 linux下 解压环境变量即可) # 创建工作…...

Record-Mode 备案免关站插件,让 WordPress 备案不影响 SEO 和收录
专为 WordPress 网站设计的实用工具,旨在帮助网站在备案期间无需关闭即可正常收录所有页面的信息,利于SEO。 功能特性 免关站展示:开启插件后,非管理员用户访问网站时,会看到以半透明遮罩层或不透明全屏遮罩样式呈现的…...

【Java 面试 八股文】Redis篇
Redis 1. 什么是缓存穿透?怎么解决?2. 你能介绍一下布隆过滤器吗?3. 什么是缓存击穿?怎么解决?4. 什么是缓存雪崩?怎么解决?5. redis做为缓存,mysql的数据如何与redis进行同步呢&…...

介绍几款免费的显示器辅助工具!
今天为大家介绍几款实用的显示器辅助软件,它们可以帮助你轻松切换显示源调节、显示器亮度,甚至优化显示效果,让你的屏幕使用体验更加便捷和舒适。 Monitor Brightness Adjuster-多屏幕亮度调节工具 如果你需要同时使用多个显示器࿰…...

django配置跨域
1、第一种 from django.views.decorators.csrf import csrf_exemptcsrf_exempt第二种 安装 pip install django-cors-headers在配置文件settings.py进入 INSTALLED_APPS [..."corsheaders", # 添加 ]MIDDLEWARE [corsheaders.middleware.CorsMiddleware, # 添加…...

web前端第三次作业
题目 本期作业 WEB第三次作业 请使用JS实一个网页中登录窗口的显示/隐藏,页面中拖动移动,并且添加了边界判断的网页效 代码图片 效果展示 代码 <!DOCTYPE html> <html lang"zh"> <head> <meta charset"UTF-8&qu…...
【Pandas】pandas Series align
Pandas2.2 Series Computations descriptive stats 方法描述Series.align(other[, join, axis, level, …])用于将两个 Series 对齐,使其具有相同的索引 pandas.Series.align pandas.Series.align() 方法用于将两个 Series 对齐,使其具有相同的索引。…...

DeepSeek-V3网络模型架构图解
DeepSeek-V3网络架构的创新主要在两次,分别是在前馈层的MOE(混合专家模型)和在注意力中的MHA(多头潜在注意力,一种注意力计算规模压缩技术)。 MOE(混合专家模型) 回顾最初的MOE GS…...
Linux系统管理小课堂
1. 文件系统:你的数字房间大扫除 例子1:藏日记本的保险箱 chmod 700 my_diary.txt 👻 解释:把日记文件权限改成「只有主人能读写」,室友偷看时系统会翻白眼:“Permission denied!” 例子2&…...
明远智睿核心板在智能家居与工业网关中的应用实践
**——从硬件支持到场景落地的技术路径** SSD2351 在智能家居与工业物联网领域,设备需具备实时响应、多协议兼容及边缘计算能力。明远智睿新款核心板凭借其硬件特性,可高效支撑以下典型场景: #### **场景一:智能家居中枢网关**…...

Windows 系统 GDAL库 配置到 Qt 上
在地理信息开发中广泛使用的开源库,GDAL(Geospatial Data Abstraction Library))库提供了读取和处理各种地理空间数据格式的能力。 准备阶段 下载 GDAL 库:前往 GDAL 的官方网站(https://www.gisinternals.com/)下载…...
部署onlyoffice后,php版的callback及小魔改(logo和关于)
作为这篇博文的补充CentOS9 安装Docker+Dpanel+onlyoffice(https、更改字体、字号、去除限制)的避坑笔记,现在继续… 本次主要内容有:php中callback的调用、自签证书调用callback遇到SSL certificate problem: unable to get local issuer certificate问题、修改onlyoffic…...

《qt open3d网格拉普拉斯平滑》
qt open3d网格拉普拉斯平滑 效果展示二、流程三、代码效果展示 二、流程 创建动作,链接到槽函数,并把动作放置菜单栏 参照前文 三、代码 1、槽函数实现 void on_actionFilterLaplacian_triggered();void MainWindow::on_actionFil...

【愚公系列】《Python网络爬虫从入门到精通》004-请求模块urllib3
标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主&…...

网络安全技术复习总结
1|0第一章 概论 1.网络安全发展阶段包括四个阶段:通信安全、计算机安全、网络安全、网络空间安全。 2.2017年6月1日,我国第一部全面规范网络空间安全的基础性法律《中华人民共和国网络安全法》正式实施。 3.2021年 6月10日,《中华人民共和…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...