当前位置: 首页 > article >正文

Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南

Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南摘要:在微服务架构中,分布式事务是无法回避的核心难题。本文深入剖析本地消息表、事务消息、SAGA、TCC 四种主流方案的实现原理,提供完整的 Go 语言代码示例,并结合电商、支付等真实场景给出选型建议。一、为什么分布式事务这么难?1.1 从单体到微服务的演变在单体应用中,事务管理相对简单:// 单体应用:本地事务即可保证一致性 func Transfer(ctx context.Context, from, to string, amount decimal.Decimal) error { tx, err := db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // 扣减账户 A _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE user_id = ?", amount, from) if err != nil { return err } // 增加账户 B _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE user_id = ?", amount, to) if err != nil { return err } return tx.Commit() }但在微服务架构下,问题变得复杂:┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 订单服务 │ │ 库存服务 │ │ 支付服务 │ │ (MySQL) │─────▶│ (MySQL) │─────▶│ (MySQL) │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────┐ │ 如何保证数据一致性? │ └─────────────────────────────────────────────────────────┘1.2 CAP 定理的权衡理论含义分布式事务中的体现Consistency一致性所有节点同一时刻数据一致Availability可用性每个请求都能得到响应Partition分区容错性网络分区时系统仍能运行结论:分布式系统中 P 是必须的,因此只能在 C 和 A 之间权衡。1.3 一致性模型强一致性 (Strong) ──────▶ 弱一致性 (Weak) │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ 线性一致性 │ │ 最终一致性 │ │ 2PC/3PC │ │ 消息队列 │ │ 性能差 │ │ 性能好 │ └─────────────┘ └─────────────┘二、方案一:本地消息表(Local Message Table)2.1 核心原理本地消息表是最经典的最终一致性方案,核心思想是:将分布式事务拆分为本地事务 + 异步消息,通过本地事务保证消息的可靠投递┌──────────────────────────────────────────────────────────┐ │ 业务流程 │ ├──────────────────────────────────────────────────────────┤ │ 1. 业务数据 + 消息 写入同一本地事务 │ │ 2. 后台任务轮询消息表,发送到 MQ │ │ 3. 消费者处理消息,实现最终一致性 │ └──────────────────────────────────────────────────────────┘2.2 Go 语言完整实现2.2.1 消息表设计CREATE TABLE local_messages ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, business_type VARCHAR(32) NOT NULL, -- 业务类型 business_data JSON NOT NULL, -- 业务数据 message_data JSON NOT NULL, -- 消息内容 status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:已发送 2:已完成 3:失败 retry_count INT DEFAULT 0, max_retry INT DEFAULT 3, next_retry_time DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_retry (status, next_retry_time) );2.2.2 消息表模型与存储层// model/message.go package model import ( "encoding/json" "time" ) type MessageStatus int const ( MessageStatusPending MessageStatus = iota // 待发送 MessageStatusSent // 已发送 MessageStatusCompleted // 已完成 MessageStatusFailed // 失败 ) type LocalMessage struct { ID int64 `db:"id"` MessageID string `db:"message_id"` BusinessType string `db:"business_type"` BusinessData json.RawMessage `db:"business_data"` MessageData json.RawMessage `db:"message_data"` Status MessageStatus `db:"status"` RetryCount int `db:"retry_count"` MaxRetry int `db:"max_retry"` NextRetryTime time.Time `db:"next_retry_time"` CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` }// repository/message_repo.go package repository import ( "context" "database/sql" "time" "github.com/google/uuid" "your-project/model" ) type MessageRepo struct { db *sql.DB } func NewMessageRepo(db *sql.DB) *MessageRepo { return MessageRepo{db: db} } // CreateInTransaction 在事务中创建消息记录 func (r *MessageRepo) CreateInTransaction( ctx context.Context, tx *sql.Tx, businessType string, businessData, messageData interface{}, ) (string, error) { messageID := uuid.New().String() businessJSON, err := json.Marshal(businessData) if err != nil { return "", err } messageJSON, err := json.Marshal(messageData) if err != nil { return "", err } query := ` INSERT INTO local_messages (message_id, business_type, business_data, message_data, status, next_retry_time) VALUES (?, ?, ?, ?, ?, ?) ` _, err = tx.ExecContext(ctx, query, messageID, businessType, businessJSON, messageJSON, model.MessageStatusPending, time.Now(), // 立即可重试 ) return messageID, err } // FetchPendingMessages 获取待发送的消息 func (r *MessageRepo) FetchPendingMessages(ctx context.Context, limit int) ([]model.LocalMessage, error) { query := ` SELECT id, message_id, business_type, business_data, message_data, status, retry_count, max_retry, next_retry_time, created_at FROM local_messages WHERE status = ? AND next_retry_time = ? ORDER BY created_at ASC LIMIT ? ` rows, err := r.db.QueryContext(ctx, query, model.MessageStatusPending, time.Now(), limit, ) if err != nil { return nil, err } defer rows.Close() var messages []model.LocalMessage for rows.Next() { var msg model.LocalMessage err := rows.Scan( msg.ID, msg.MessageID, msg.BusinessType, msg.BusinessData, msg.MessageData, msg.Status, msg.RetryCount, msg.MaxRetry, msg.NextRetryTime, msg.CreatedAt, ) if err != nil { return nil, err } messages = append(messages, msg) } return messages, rows.Err() } // UpdateStatus 更新消息状态 func (r *MessageRepo) UpdateStatus(ctx context.Context, messageID string, status model.MessageStatus) error { query := ` UPDATE local_messages SET status = ?, updated_at = ?, next_retry_time = ? WHERE message_id = ? ` var nextRetry time.Time if status == model.MessageStatusPending { // 指数退避:1min, 2min, 4min, 8min... nextRetry = time.Now().Add(5 * time.Minute) } _, err := r.db.ExecContext(ctx, query, status, time.Now(), nextRetry, messageID) return err } // MarkCompleted 标记消息处理完成 func (r *MessageRepo) MarkCompleted(ctx context.Context, messageID string) error { query := `UPDATE local_messages SET status = ?, updated_at = ? WHERE message_id = ?` _, err := r.db.ExecContext(ctx, query, model.MessageStatusCompleted, time.Now(), messageID) return err }2.2.3 消息发送器// messenger/messenger.go package messenger import ( "context" "encoding/json" "fmt" "log" "time" "github.com/segmentio/kafka-go" "your-project/model" "your-project/repository" ) type Messenger struct { repo *repository.MessageRepo kafkaWriter *kafka.Writer running chan struct{} } func NewMessenger(repo *repository.MessageRepo, brokers []string) *Messenger { return Messenger{ repo: repo, kafkaWriter: kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: "distributed-tx-events", Balancer: kafka.LeastBytes{}, }, running: make(chan struct{}), } } // Start 启动消息轮询发送 func (m *Messenger) Start(ctx context.Context) { go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case -ctx.Done(): return case -ticker.C: m.sendPendingMessages(ctx) } } }() } func (m *Messenger) sendPendingMessages(ctx context.Context) { messages, err := m.repo.FetchPendingMessages(ctx, 100) if err != nil { log.Printf("fetch pending messages error: %v", err) return } for _, msg := range messages { if err := m.sendMessage(ctx, msg); err != nil { log.Printf("send message %s error: %v", msg.MessageID, err) // 更新重试次数 m.updateRetryStatus(ctx, msg) } else { // 发送成功,标记为已发送 _ = m.repo.UpdateStatus(ctx, msg.MessageID, model.MessageStatusSent) } } } func (m *Messenger) sendMessage(ctx context.Context, msg model.LocalMessage) error { var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageData, messageData); err != nil { return err } // 添加消息 ID 到 header,用于幂等性校验 kafkaMsg := kafka.Message{

相关文章:

Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南

Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南 摘要:在微服务架构中,分布式事务是无法回避的核心难题。本文深入剖析本地消息表、事务消息、SAGA、TCC 四种主流方案的实现原理,提供完整的 Go 语言代码示例,并结合电商、支付等真实场景给…...

基于单片机的瓦斯监测系统设计

收藏关注不迷路!! 🌟文末获取源码数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题(免费咨询指导选题),项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多…...

求大佬帮忙解决问题

电脑突然不知道啥原因,安装程序的时候只能选择桌面目录进行安装,但是可以手动输入盘符安装到其他目录...

BNU-25硕信息学奥赛day2

2026.3.13学习笔记 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录2026.3.13学习笔记前言一、变量名和常量1.变量名案例2.规则3.常量名命名规则4.内置数据类型二、赋值与变量自增1.赋值语句2.自增2.1 自增/自减运算…...

DOM Element

DOM Element 引言 在Web开发中,DOM(Document Object Model)是描述HTML和XML文档的标准模型。DOM Element是DOM模型的核心组成部分,它代表了HTML或XML文档中的元素。本文将深入探讨DOM Element的概念、属性、方法以及在实际开发中的应用。 什么是DOM Element? DOM Elem…...

Julia 交互式命令详解

Julia 交互式命令详解 引言 Julia 是一种高性能的编程语言,广泛应用于科学计算、数据分析、机器学习等领域。在 Julia 中,交互式命令行(REPL)是一种非常方便的工具,它允许开发者直接在终端中运行代码并立即看到结果。本文将详细介绍 Julia 的交互式命令,帮助您更好地利…...

ionic 浮动框:实现与优化指南

ionic 浮动框:实现与优化指南 引言 在移动应用开发中,用户界面的设计对于提升用户体验至关重要。Ionic框架,作为一款流行的HTML5移动应用开发框架,提供了丰富的组件和功能,帮助开发者构建高性能、跨平台的应用。其中,浮动框(Floating Action Button,简称FAB)作为一种…...

金舟软件AI对话工具-20260313提问VLAN技术

嗯……用户突然问起了VLAN的定义,看来是想了解网络分段的基础知识。不过从之前的对话来看,ta似乎更关注的是Linux系统运维(CentOS 7命令),刚才又跳到AI年龄判断,跨度有点大啊。用户可能是刚开始学习网络技术…...

Java + OSHI 实战:从零搭建企业级电脑硬件信息检测

作为一名信息处理员,想必都遇到过这类 “重复劳动” 的场景: 经理:“小白,把公司所有电脑的硬件信息统计一下,做个固定资产盘点表。” 如果手动去每台电脑上看「设备管理器」「系统信息」,不仅要反复操作相同步骤,还容易漏记、错记,几百台设备得耗上几天,效率低到离谱…...

Oracle VM VirtualBox 虚拟机安装增强功能及共享粘贴板

在虚拟机设置中设置了剪切板及文件拖拽双向共享但是不起作用, 排查了下原因并记录以作备忘,具体步骤如下:首先,虚拟机关机状态设置共享:选择对应的虚拟机(关机状态)选择“设置” - "常规" - “高…...

科技中介如何优化技术转移服务流程?

观点作者:科易网-AI技术转移与科技创新数智化服务平台一、现状概述:成效与短板 在数智化转型浪潮下,科技成果转化正经历深刻变革。传统科技中介服务模式以线下对接、人工匹配为主,存在信息不对称、响应滞后、转化效率低等问题。尽…...

财务如何使用应收账款管理工具,避免一套数据录入两遍

一、应收账款管理工具的核心价值1. 应收账款管理的定义与重要性应收账款管理是指企业对因销售商品或提供服务而产生的未收回款项进行系统化追踪、记录和催收的过程。它是企业财务管理的重要组成部分,直接影响现金流健康和资金周转效率。数据显示,应收账款…...

工业物联网网关能够应用在哪些场景,发挥什么功能

数字化转型不是某一个行业、某一类设备的专属,而是全行业、全场景的共同趋势。对此,物通博联(WideIOT)推出工业物联网网关,具备多场景适配、多行业验证、全方案支撑等优势,广泛应用于智能制造、智慧能源、环…...

多模型聚合的AI图像生成工作台——椒图AI深度评测与实战:低成本实现无痕改字与8K高清放大

在当前的AIGC浪潮中,图像生成模型层出不穷。对于开发者、UI设计师以及泛内容创作者而言,单一模型往往难以满足全场景的需求。要么长于写实但短于文字生成,要么支持高清放大但推理效率极低。搭建本地环境不仅硬件成本高昂,且不同模…...

汽车制动噪声测试系统

一、BNA 系统概述车辆制动噪声测试(BNA)系统是汉航(北京)科技有限公司基于汉航NTS.LAB平台研发的综合性测试设备,专门应用于车辆道路试验,核心目标是实现对车辆制动噪声的全方位监测、精准分析与数据记录。…...

GB/T 40613-2021 虚拟电厂技术规范深度解读

1. 标准概述1.1 标准基本信息标准号GB/T 40613-2021标准名称虚拟电厂技术规范英文名称Technical specification for virtual power plant发布机构国家市场监督管理总局、国家标准化管理委员会发布日期2021-10-11实施日期2022-05-01标准状态现行有效归口单位全国电力需求侧管理标…...

标题:别卷了,GEO 这玩意儿到底是啥?给大伙儿盘盘道

嘿,各位老铁,今儿咱们不聊那些虚头巴脑的“颠覆行业”,也不整什么“三天速成”的营销套路。作为一个在搜索推荐和AI优化这个圈子里摸爬滚打快二十年的“老油条”,看着现在的年轻人天天把GEO(Generative Engine Optimiz…...

跟江协学32之GPIO介绍

GPIO简介这部分了解一下即可,GPIO是基本,后续会经常使用GPIO基本结构在STM32中,所有的GPIO的都是挂载在APB2总线上,每个GPIO都有16个引脚,编号0~15。内核通过APB2总线对寄存器进行读写,输出寄存器写1&#…...

能碳管理系统组成与原理解析:揭开绿色发展背后的 “神秘面纱”?

全面解读能碳管理系统:从原理到价值的深度剖析从 “感知” 到 “认知”:系统如何捕获能源与碳的踪迹要理解能碳管理系统,先得从它最基础的感知能力入手。这个系统可不是凭空运作的,它首先要解决一个根本问题:怎样精准、…...

从零搭建个人独立博客:Hexo + GitHub Pages 极速建站与踩坑实录

引言作为一名爱折腾的开发者,刚解决完一个极其棘手的 WebGL 3D 网页滚动陷阱 Bug,最爽的事情莫过于把这份血汗经验写成文章分享出来!这篇文章将为你带来一份实战教程,完整记录我是如何使用 Hexo 配合 GitHub Pages 建站&#xff0…...

好用的玉柴柴油发电机组哪个服务好

扬州量子电力设备有限公司:为玉柴发电机组提供专业的技术服务与方案解析玉柴柴油发电机组在长期高负荷运行下的功率稳定性与燃油经济性平衡,是当前行业普遍面临的技术挑战。这不仅关系到设备的使用寿命,更直接影响运营成本与供电可靠性。针对…...

平行链协议深度拆解 | 一个区块如何穿越六道关卡获得最终确认

原文作者:PaperMoon 团队一个平行链区块要想获得 Polkadot 网络的最终安全背书,需要经历候选、附议、可背书、已背书、待可用、已包含六个状态——任何一步失败都会被丢弃。这套机制的名字听起来很学术,但它解决的问题极其现实:几…...

全文 - Quantum error correction below the surface code threshold

低于表面码阈值的量子纠错 谷歌量子人工智能团队及合作者(2024 年 8 月 24 日) 摘要 量子纠错 [1,2,3,4] 通过将多个物理量子比特整合为一个逻辑量子比特,为实现实用化量子计算提供了路径:随着量子比特数量的增加,逻…...

aspnet_counters.dll文件彻底修复方法 附免费的下载解决办法

在使用电脑系统时经常会出现丢失找不到某些文件的情况,由于很多常用软件都是采用 Microsoft Visual Studio 编写的,所以这类软件的运行需要依赖微软Visual C运行库,比如像 QQ、迅雷、Adobe 软件等等,如果没有安装VC运行库或者安装…...

Burp Suite Professional 2026.3 for Windows x64 - 领先的 Web 渗透测试软件

Burp Suite Professional 2026.3 for Windows x64 - 领先的 Web 渗透测试软件 世界排名第一的 Web 渗透测试工具包 请访问原文链接:https://sysin.org/blog/burp-suite-pro-win/ 查看最新版。原创作品,转载请保留出处。 作者主页:sysin.or…...

IsaacSim 安装与使用记录(8)

IsaacSim 安装与使用记录(8) 基于ROS2 Python自定义OmniGraph Node 打开Isaac Sim VS Code Edition(VS Code extension) 配置生成的OmniGraph Node 编辑extension.toml 编辑OmniGraph定义文件 CategoryDefinition.json 编辑OmniGraph Python源码 自定义控制器 使用自定义的…...

Ubuntu18.04 for Xilinx19.2 环境安装

Ubuntu18.04 for Xilinx19.2 ✉️ 安装目标: Ubuntu 18.04 虚拟机Vivado 19.2MATLAB 2018bSynopsys (VCS-MX Verdi)VCS Test Code 材料准备: Ubuntu 18.04 镜像Vivado 19.2 安装包MATLAB 2018b 安装包Synopsys 2018.09 安装包VMware16 Pro物理机一台 …...

MIT突破:多智能体系统破解PFAS替代材料发现难题

这项由麻省理工学院土木环境工程系、机械工程系以及Schwarzman计算学院联合开展的研究,发表于2026年《计算机科学与人工智能》领域的arXiv预印本平台(论文编号:arXiv:2602.07491v1),有兴趣深入了解的读者可以通过该编号…...

重庆团建企业选哪家

在当前的商业环境中,团队建设活动已经成为提升企业凝聚力和员工士气的重要手段。对于重庆的企业来说,选择一家合适的团建服务提供商至关重要。本文将对重庆的几家知名团建企业进行分析,并重点推荐重庆领军者文化传播有限公司。引言随着市场竞…...

大模型落地实战:技术选型到部署全解析

大模型落地实践指南:技术路径与关键挑战 企业级大模型应用需从技术选型开始。主流技术路径包括基于开源框架(如LLaMA、Falcon)的微调、使用API服务(如GPT-4、Claude)的快速接入,以及混合部署模式。技术选型…...