当前位置: 首页 > news >正文

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息

  • 现在做一个RocketMQ的事务消息的 demo

1 )生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)// 自定义结构体,为了实现 NewTransactionProducer 第一个参数的接口
type MyListener struct{}func (hl MyListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {return primitive.CommitMessageState
}func (hl MyListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {return primitive.CommitMessageState
}func main() {// ------------ 1. 连接RocketMQ ------------------------mqAddr := "127.0.0.1:9876" // 模拟地址// NewTransactionProducer 这个方法第一个参数是一个 Listener// 是一个接口,需要一个接口体去实现它的方法p, err := rocketmq.NewTransactionProducer( // 开启事物消息生产者MyListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err) // 生产环境禁用panic}// ------------ 2. 启动RocketMQ ------------------------err = p.Start()if err != nil {panic(err)}// ------------ 3. 发送RocketMQ 消息 ------------------------res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("MyTransactionTopic", []byte("xxxxxxxxxxxyyyyyyyyyyyyyzzzzzzzzzzzz")),)fmt.Println(res.Status)if err != nil {panic(err)}fmt.Printf("发送成功")time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
}
  • 可见, primitive.LocalTransactionState 是返回值
  • 进入这个包中,它有三个状态
    • CommitMessageState 提交状态
    • RollbackMessageState 回滚状态
    • UnknowState 未知状态
  • 在后续,这三个状态都可以再试一试

2 ) 运行后,在UI界面查看消息


2.1 输出信息如下

INFO[0000] change the route for clients                 
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-bbs\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-bbs\",\"brokerAddrs\":{\"0\":\"192.168.124.6:10911\"}}]}" changedFrom="<nil>" topic=MyTransactionTopic
0
发送成功

2.2 运行效果

2 )消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {mqAddr := "127.0.0.1:9876"topic := "MyTransactionTopic"groupName := "ddddddd"c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}
  • 在RocketMQ中,事务消息的处理机制涉及到生产者和消费者两端的协作,但与普通消息消费模式有所区别

  • 事务消息的消费端并不直接参与到事务的两阶段提交过程中,它更像是一个“半事务消息”的确认者

  • 具体流程如下:

    • 生产者发送事务消息:生产者发送一条半事务消息到MQ服务器,并立即返回,此时消息处于“Prepare”状态
    • MQ Server回调生产者确认:MQ服务器会回调生产者提供的事务监听器(在Go示例中是HappyListener),执行本地事务。生产者需在此阶段执行事务操作并决定是提交还是回滚该消息
    • 生产者根据本地事务结果告知MQ Server:生产者根据本地事务执行结果,通过事务状态检查接口告诉MQ服务器是提交还是回滚这条半事务消息。
    • 消息变为可消费状态:MQ服务器根据生产者的决定,将消息标记为Commit或Rollback,Commit后的消息才对普通消费者可见
  • 因此,对于事务消息的消费者来说,其主要职责是消费那些已经被事务提交成功的消息,而不需要直接参与事务的提交或回滚过程

  • 消费者代码看起来与普通消息的消费者相似,但消费的消息实际上是生产者已经提交成功的事务消息

  • 不过,如果您的需求是希望消费者也以某种形式参与到事务的最终确认中,比如基于消息的消费结果来决定是否提交事务,这在RocketMQ的标准事务消息模型中并不直接支持

  • RocketMQ的事务模型主要关注于保证消息生产和本地事务的原子性,消费者更多的是作为事务结果的后续处理者角色

  • 在提供的消费者示例中,尽管它看起来是一个普通的消费者,但实际上它处理的是生产者通过事务消息流程提交后的内容,这符合事务消息的消费逻辑

  • 如果需要在消费端实现更复杂的逻辑来间接响应事务状态,可能需要结合业务系统进行额外的设计,比如通过监听数据库状态变化、消息队列的死信队列特性或其他补偿机制来处理未决事务

延迟性事务消息

  • 您需要在创建消息时指定消息的延迟等级,而不是在生产者配置或消息发送后进行延迟
  • RocketMQ支持多种延迟等级,每种等级对应不同的延迟时间
  • 注意,事务消息和延迟消息的直接组合在RocketMQ中并不是直接支持的特性
  • 因为事务消息的设计主要是围绕两阶段提交模型,确保消息发送与本地事务的一致性
  • 而延迟消息侧重于消息的定时投递
  • 然而,可以通过间接的方式结合这两个特性,即在事务消息的本地事务逻辑中包含对延迟操作的处理
  • 下面的示例尝试模拟一种结合方式,但请注意,这仅是一种逻辑上的结合,实际应用中需要根据具体业务场景仔细设计和测试
  • 方案思路:
    • 生产者:发送一个事务消息到特定的主题(例如DelayedTransactionTopic),该消息体中携带了需要进行延迟处理的信息。
    • 事务监听器:在ExecuteLocalTransaction方法中,不直接执行长时间的延迟逻辑,而是执行快速操作(如记录消息待处理状态或存入DB),然后返回primitive.Prepared状态。
    • 检查事务状态:在CheckLocalTransaction方法中,检查事务状态,如果需要,触发一个异步任务或消息队列中的消息,该消息携带延迟处理逻辑和真正的延迟时间。
    • 延迟处理服务:这个服务从队列中取出消息并根据消息中的指示进行真正的延迟操作,例如通过内部队列或定时任务系统(如分布式定时任务框架)来实现延迟执行
  • 相关生产者伪代码未完全实现,仅供参考
    package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )type DelayedTxListener struct{}func (d DelayedTxListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 假设这里将消息标识为待处理,并记录相关信息到数据库fmt.Println("Preparing transaction, storing message meta...")return primitive.Prepared
    }func (d DelayedTxListener) CheckLocalTransaction(msgExt *primitive.MessageExt) primitive.LocalTransactionState {// 在这里检查消息是否准备好执行延迟操作// 实际操作可能包括从数据库查询该消息的状态// 假设我们已经确定需要进行延迟操作,这里直接模拟提交return primitive.CommitMessageState
    }func main() {mqAddr := "127.0.0.1:9876"p, err := rocketmq.NewTransactionProducer(DelayedTxListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err)}err = p.Start()if err != nil {panic(err)}msg := &primitive.Message{Topic: "DelayedTransactionTopic",Body:  []byte("消息内容,可以包含延迟处理的详细信息"),}res, err := p.SendMessageInTransaction(context.Background(), msg)if err != nil {panic(err)}fmt.Printf("发送事务消息状态: %v\n", res.Status)time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
    }
    
  • 注意事项
    • 这个示例主要是概念性的,展示了如何在事务消息的上下文中计划后续的延迟处理步骤,而没有直接实现延迟消息的发送。
    • 实际应用中,您可能需要实现一个额外的后台服务或消息队列来处理这些“计划”好的延迟任务,确保它们能在预定时间得到执行。
    • 事务消息的两阶段提交机制仍然适用,只是延迟操作本身不在RocketMQ的直接事务控制范围内,而是作为一种业务逻辑上的后续处理。
    • 务必根据您的具体业务需求和RocketMQ的版本特性,仔细设计和测试这样的解决方案。

相关文章:

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息 现在做一个RocketMQ的事务消息的 demo 1 &#xff09;生产者 package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/prim…...

【go】go初始化命令总结

包初始化 test项目目录下执行 go mod init test go mod tidy生成二进制可执行文件 go build -o test .\main.go...

vue音乐播放条

先看效果 再看代码 <template><div class"footer-player z-30 flex items-center p-2"><div v-if"isShow" class"h-12 w-60 overflow-hidden"><div :style"activeStyle" class"open-detail-control-wrap&…...

halcon实现浓淡补正,中间值补正-抽取暗

代码效果 抽取前 中值抽取暗 halcon函数代码 测试图片参数 NoiseCut:16 Gain:1 输入ImagePart NoiseCut Gain *获取直方图 get_domain (ImagePart, Domain) gray_histo_range(Domain,ImagePart,0,255,256, Histo, BinSize) area_center(Domain, NumPixels, Row, Column) …...

太速科技-FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡

FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡 一、板卡概述 该板卡为了考虑兼容1.8V电平IO&#xff0c;适配Virtex7&#xff0c;Kintex Ultrascale&#xff0c;Virtex ultrasacle FPGA而特制&#xff0c;如果要兼容原来的3.3V 也可以修改硬件参数。板卡支持1路…...

GPU短缺和模型效率的推动

1. 引言 随着全球GPU短缺和云计算成本的不断上升&#xff0c;开发更高效的AI模型成为了当前的焦点。技术如低秩适应&#xff08;LoRA&#xff09;和量化&#xff08;Quantization&#xff09;在优化性能的同时&#xff0c;减少了资源需求。这些技术不仅在当前的AI开发中至关重…...

linux在文件夹中查找文件内容

linux在文件夹中查找文件内容 在Linux中,可以通过以下多个途径,在文件夹中查找文件内容: 1、使用grep命令: grep -r "要查找的内容" /path/to/folder-r参数表示递归地在文件夹及其子文件夹中搜索。/path/to/folder是要搜索的文件夹路径。2、使用ack命令 ack …...

算法:11. 盛最多水的容器

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你…...

Hazelcast 分布式缓存 在Seatunnel中的使用

1、背景 最近在调研seatunnel的时候&#xff0c;发现新版的seatunnel提供了一个web服务&#xff0c;可以用于图形化的创建数据同步任务&#xff0c;然后管理任务。这里面有个日志模块&#xff0c;可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据…...

分数限制下,选好专业还是选好学校?

目录 分数限制下&#xff0c;选好专业还是选好学校&#xff1f; 方向一&#xff1a;专业解析 1. 专业选择的重要性 2. 不同专业的优势与挑战 3. 个人专业选择经验分享 4. 实际场景下的“专业VS学校”选择方案 方向二&#xff1a;名校效应分析 1. 名校声誉与品牌效应 2…...

软件改为开机自启动

1.按键 win R,输入“shell:startup”命令, 然后就可以打开启动目录了&#xff0c;如下&#xff1a; 2.然后&#xff0c;把要开机启动的程序的图标拖进去即可。 参考&#xff1a;开机启动项如何设置...

集群down机的应急和恢复测试(非重做备机)

1. 集群的两台服务器的状态 实例 正常情况主备 ip 端口 node1 主机 192.168.6.6 9088 node2 备机 192.168.6.7 9088 2. 测试的步骤 down掉node1观察node2的状态在node2未自动切换的时候手动将node2调整为单机状态&#xff0c;模拟紧急使用模拟不紧急时&#xff0…...

【数据库系统概论复习】关系数据库与关系代数笔记

文章目录 基本概念数据库基本概念关系数据结构完整性约束 关系代数关系代数练习课堂练习 语法树 基本概念 数据库基本概念 DB 数据库&#xff0c; 为了存用户的各种数据&#xff0c;我们要建很多关系&#xff08;二维表&#xff09;&#xff0c;所以把相关的关系&#xff08;二…...

赛氪网受邀参加上海闵行区翻译协会年会,共探科技翻译创新之路

在科技飞速发展的时代背景下&#xff0c;翻译行业正面临着前所未有的机遇与挑战。作为连接高校、企业与社会的桥梁&#xff0c;赛氪网在推动翻译创新、促进学术交流方面展现出了独特的魅力。2024年6月9日&#xff0c;在华东师范大学外语学院举办的第十三届上海市闵行区翻译协会…...

项目管理进阶之EVM(挣值管理)

前言 项目管理进阶系列&#xff0c;终于有时间更新啦&#xff01;&#xff01;&#xff01;欢迎持续关注哦~ 上一节博主重点讲了一个环&#xff1a;PDCA&#xff0c;无论各行各业&#xff0c;上到航空航天、下到种地种菜&#xff0c;都离不开对质量的监督和改进。这个环既是一…...

PLSQL、Oracle以及客户端远程连接服务器笔记(仅供参考)

1.PLSQL参考链接&#xff1a; 全网最全最细的PLSQL下载、安装、配置、使用指南、问题解答&#xff0c;相关问题已汇总-CSDN博客文章浏览阅读2.9w次&#xff0c;点赞98次&#xff0c;收藏447次。双击之后&#xff0c;这里选择安装目录&#xff0c;你安装目录选的哪里&#xff0…...

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…...

【机器学习】基于顺序到顺序Transformer机器翻译

引言 1.1 序列到序列模型详解 序列到序列(Seq2Seq)模型是深度学习中处理序列数据转换问题的关键架构。在自然语言处理(NLP)任务中&#xff0c;如机器翻译、文本摘要和聊天机器人等&#xff0c;Seq2Seq模型能够高效地将输入序列转换为期望的输出序列。 模型架构&#xff1a; 编…...

TEA 加密的 Java 实现

import java.nio.ByteBuffer; import java.nio.ByteOrder;public class TeaUtils {private static final int DELTA 0x9E3779B9;private static final int ROUND 32;private static final String KEY "password";/*** 加密字符串&#xff0c;使用 TEA 加密算法*/p…...

鸿蒙开发电话服务:【@ohos.telephony.data (蜂窝数据)】

蜂窝数据 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import data from ohos.telephony.data;data.getDefaultCellularDataSlotId getDefaultCellularDataSlotId(callback: Async…...

从玩具车到智能体:用STC89C52给小车装上‘眼睛’和‘触角’的传感器融合实战

从玩具车到智能体&#xff1a;STC89C52多传感器融合的决策系统设计 当一辆普通的玩具车被赋予环境感知能力&#xff0c;它便开始了向智能体的进化。在这个项目中&#xff0c;我们使用STC89C52单片机作为"大脑"&#xff0c;通过超声波模块和漫反射光电传感器构建了一…...

【信息科学与工程学】【通信工程】第四十三篇 骨干网方案设计-02跨境网络

一、方案 1.1 整体方案设计概要 设计的云网融合方案,综合考虑其全球互联需求、安全合规性、性能优化及跨国运营挑战: ​1.1.1、需求分析 ​网络互联需求:​​ ​国内互通:​​ 安全、稳定、低延迟连接中国大陆(严格合规要求)。 ​国际互通:​​ 高性能连接美国(东西海…...

如何为Unity游戏添加多语言支持:XUnity.AutoTranslator完整指南

如何为Unity游戏添加多语言支持&#xff1a;XUnity.AutoTranslator完整指南 【免费下载链接】XUnity.AutoTranslator 项目地址: https://gitcode.com/gh_mirrors/xu/XUnity.AutoTranslator 你是否曾经因为语言障碍而无法享受心爱的Unity游戏&#xff1f;是否想要为你的…...

ExDark低光照图像数据集技术架构:构建真实世界低光照计算机视觉解决方案

ExDark低光照图像数据集技术架构&#xff1a;构建真实世界低光照计算机视觉解决方案 【免费下载链接】Exclusively-Dark-Image-Dataset Exclusively Dark (ExDARK) dataset which to the best of our knowledge, is the largest collection of low-light images taken in very …...

SPSS数据合并避坑指南:键变量设置、缺失值处理与常见错误解析

SPSS数据合并实战避坑手册&#xff1a;从原理到解决方案 数据合并是SPSS分析过程中最基础也最容易出错的环节之一。许多用户在按照网络教程操作后&#xff0c;常常发现合并结果与预期不符——变量丢失、数据错乱、大量缺失值涌现。这些问题往往源于对合并原理的理解不足和关键细…...

flux_down 下载工具使用步骤详解(附FluxDown多线程下载与磁力解析教程)

在技术领域&#xff0c;我们常常被那些闪耀的、可见的成果所吸引。今天&#xff0c;这个焦点无疑是大语言模型技术。它们的流畅对话、惊人的创造力&#xff0c;让我们得以一窥未来的轮廓。然而&#xff0c;作为在企业一线构建、部署和维护复杂系统的实践者&#xff0c;我们深知…...

3分钟掌握:网易云音乐无损FLAC批量下载终极指南

3分钟掌握&#xff1a;网易云音乐无损FLAC批量下载终极指南 【免费下载链接】NeteaseCloudMusicFlac 根据网易云音乐的歌单, 下载flac无损音乐到本地.。 项目地址: https://gitcode.com/gh_mirrors/nete/NeteaseCloudMusicFlac 还在为无法保存高品质音乐而烦恼吗&#x…...

如何用猫抓浏览器扩展轻松捕获在线视频资源?一个实用工具的全方位指南

如何用猫抓浏览器扩展轻松捕获在线视频资源&#xff1f;一个实用工具的全方位指南 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 当你在浏览器中观…...

2026十大建议考的经济学专业证书有哪些

2026年十大经济学专业证书推荐经济学专业证书能够提升职业竞争力&#xff0c;尤其在数据分析、金融和经济预测领域。以下是2026年值得考取的十大经济学专业证书&#xff0c;包括CDA数据分析师证书等热门选择。1. CDA数据分析师证书CDA数据分析师证书是数据分析领域的权威认证&a…...

2026年简易操作安装Hermes Agent/OpenClaw Token Plan全流程解析大全

2026年简易操作安装Hermes Agent/OpenClaw Token Plan全流程解析大全。OpenClaw作为阿里云生态下新一代的开源AI自动化代理平台&#xff0c;曾用名Moltbot/Clawdbot&#xff0c;凭借“自然语言交互自动化任务执行大模型智能决策”的核心能力&#xff0c;正在重构个人与企业的工…...