当前位置: 首页 > news >正文

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息

  • 现在做一个RocketMQ的事务消息的 demo

1 )生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)// 自定义结构体,为了实现 NewTransactionProducer 第一个参数的接口
type MyListener struct{}func (hl MyListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {return primitive.CommitMessageState
}func (hl MyListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {return primitive.CommitMessageState
}func main() {// ------------ 1. 连接RocketMQ ------------------------mqAddr := "127.0.0.1:9876" // 模拟地址// NewTransactionProducer 这个方法第一个参数是一个 Listener// 是一个接口,需要一个接口体去实现它的方法p, err := rocketmq.NewTransactionProducer( // 开启事物消息生产者MyListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err) // 生产环境禁用panic}// ------------ 2. 启动RocketMQ ------------------------err = p.Start()if err != nil {panic(err)}// ------------ 3. 发送RocketMQ 消息 ------------------------res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("MyTransactionTopic", []byte("xxxxxxxxxxxyyyyyyyyyyyyyzzzzzzzzzzzz")),)fmt.Println(res.Status)if err != nil {panic(err)}fmt.Printf("发送成功")time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
}
  • 可见, primitive.LocalTransactionState 是返回值
  • 进入这个包中,它有三个状态
    • CommitMessageState 提交状态
    • RollbackMessageState 回滚状态
    • UnknowState 未知状态
  • 在后续,这三个状态都可以再试一试

2 ) 运行后,在UI界面查看消息


2.1 输出信息如下

INFO[0000] change the route for clients                 
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-bbs\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-bbs\",\"brokerAddrs\":{\"0\":\"192.168.124.6:10911\"}}]}" changedFrom="<nil>" topic=MyTransactionTopic
0
发送成功

2.2 运行效果

2 )消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {mqAddr := "127.0.0.1:9876"topic := "MyTransactionTopic"groupName := "ddddddd"c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}
  • 在RocketMQ中,事务消息的处理机制涉及到生产者和消费者两端的协作,但与普通消息消费模式有所区别

  • 事务消息的消费端并不直接参与到事务的两阶段提交过程中,它更像是一个“半事务消息”的确认者

  • 具体流程如下:

    • 生产者发送事务消息:生产者发送一条半事务消息到MQ服务器,并立即返回,此时消息处于“Prepare”状态
    • MQ Server回调生产者确认:MQ服务器会回调生产者提供的事务监听器(在Go示例中是HappyListener),执行本地事务。生产者需在此阶段执行事务操作并决定是提交还是回滚该消息
    • 生产者根据本地事务结果告知MQ Server:生产者根据本地事务执行结果,通过事务状态检查接口告诉MQ服务器是提交还是回滚这条半事务消息。
    • 消息变为可消费状态:MQ服务器根据生产者的决定,将消息标记为Commit或Rollback,Commit后的消息才对普通消费者可见
  • 因此,对于事务消息的消费者来说,其主要职责是消费那些已经被事务提交成功的消息,而不需要直接参与事务的提交或回滚过程

  • 消费者代码看起来与普通消息的消费者相似,但消费的消息实际上是生产者已经提交成功的事务消息

  • 不过,如果您的需求是希望消费者也以某种形式参与到事务的最终确认中,比如基于消息的消费结果来决定是否提交事务,这在RocketMQ的标准事务消息模型中并不直接支持

  • RocketMQ的事务模型主要关注于保证消息生产和本地事务的原子性,消费者更多的是作为事务结果的后续处理者角色

  • 在提供的消费者示例中,尽管它看起来是一个普通的消费者,但实际上它处理的是生产者通过事务消息流程提交后的内容,这符合事务消息的消费逻辑

  • 如果需要在消费端实现更复杂的逻辑来间接响应事务状态,可能需要结合业务系统进行额外的设计,比如通过监听数据库状态变化、消息队列的死信队列特性或其他补偿机制来处理未决事务

延迟性事务消息

  • 您需要在创建消息时指定消息的延迟等级,而不是在生产者配置或消息发送后进行延迟
  • RocketMQ支持多种延迟等级,每种等级对应不同的延迟时间
  • 注意,事务消息和延迟消息的直接组合在RocketMQ中并不是直接支持的特性
  • 因为事务消息的设计主要是围绕两阶段提交模型,确保消息发送与本地事务的一致性
  • 而延迟消息侧重于消息的定时投递
  • 然而,可以通过间接的方式结合这两个特性,即在事务消息的本地事务逻辑中包含对延迟操作的处理
  • 下面的示例尝试模拟一种结合方式,但请注意,这仅是一种逻辑上的结合,实际应用中需要根据具体业务场景仔细设计和测试
  • 方案思路:
    • 生产者:发送一个事务消息到特定的主题(例如DelayedTransactionTopic),该消息体中携带了需要进行延迟处理的信息。
    • 事务监听器:在ExecuteLocalTransaction方法中,不直接执行长时间的延迟逻辑,而是执行快速操作(如记录消息待处理状态或存入DB),然后返回primitive.Prepared状态。
    • 检查事务状态:在CheckLocalTransaction方法中,检查事务状态,如果需要,触发一个异步任务或消息队列中的消息,该消息携带延迟处理逻辑和真正的延迟时间。
    • 延迟处理服务:这个服务从队列中取出消息并根据消息中的指示进行真正的延迟操作,例如通过内部队列或定时任务系统(如分布式定时任务框架)来实现延迟执行
  • 相关生产者伪代码未完全实现,仅供参考
    package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )type DelayedTxListener struct{}func (d DelayedTxListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 假设这里将消息标识为待处理,并记录相关信息到数据库fmt.Println("Preparing transaction, storing message meta...")return primitive.Prepared
    }func (d DelayedTxListener) CheckLocalTransaction(msgExt *primitive.MessageExt) primitive.LocalTransactionState {// 在这里检查消息是否准备好执行延迟操作// 实际操作可能包括从数据库查询该消息的状态// 假设我们已经确定需要进行延迟操作,这里直接模拟提交return primitive.CommitMessageState
    }func main() {mqAddr := "127.0.0.1:9876"p, err := rocketmq.NewTransactionProducer(DelayedTxListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err)}err = p.Start()if err != nil {panic(err)}msg := &primitive.Message{Topic: "DelayedTransactionTopic",Body:  []byte("消息内容,可以包含延迟处理的详细信息"),}res, err := p.SendMessageInTransaction(context.Background(), msg)if err != nil {panic(err)}fmt.Printf("发送事务消息状态: %v\n", res.Status)time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
    }
    
  • 注意事项
    • 这个示例主要是概念性的,展示了如何在事务消息的上下文中计划后续的延迟处理步骤,而没有直接实现延迟消息的发送。
    • 实际应用中,您可能需要实现一个额外的后台服务或消息队列来处理这些“计划”好的延迟任务,确保它们能在预定时间得到执行。
    • 事务消息的两阶段提交机制仍然适用,只是延迟操作本身不在RocketMQ的直接事务控制范围内,而是作为一种业务逻辑上的后续处理。
    • 务必根据您的具体业务需求和RocketMQ的版本特性,仔细设计和测试这样的解决方案。

相关文章:

Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息 现在做一个RocketMQ的事务消息的 demo 1 &#xff09;生产者 package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/prim…...

【go】go初始化命令总结

包初始化 test项目目录下执行 go mod init test go mod tidy生成二进制可执行文件 go build -o test .\main.go...

vue音乐播放条

先看效果 再看代码 <template><div class"footer-player z-30 flex items-center p-2"><div v-if"isShow" class"h-12 w-60 overflow-hidden"><div :style"activeStyle" class"open-detail-control-wrap&…...

halcon实现浓淡补正,中间值补正-抽取暗

代码效果 抽取前 中值抽取暗 halcon函数代码 测试图片参数 NoiseCut:16 Gain:1 输入ImagePart NoiseCut Gain *获取直方图 get_domain (ImagePart, Domain) gray_histo_range(Domain,ImagePart,0,255,256, Histo, BinSize) area_center(Domain, NumPixels, Row, Column) …...

太速科技-FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡

FMC213V3-基于FMC兼容1.8V IO的Full Camera Link 输入子卡 一、板卡概述 该板卡为了考虑兼容1.8V电平IO&#xff0c;适配Virtex7&#xff0c;Kintex Ultrascale&#xff0c;Virtex ultrasacle FPGA而特制&#xff0c;如果要兼容原来的3.3V 也可以修改硬件参数。板卡支持1路…...

GPU短缺和模型效率的推动

1. 引言 随着全球GPU短缺和云计算成本的不断上升&#xff0c;开发更高效的AI模型成为了当前的焦点。技术如低秩适应&#xff08;LoRA&#xff09;和量化&#xff08;Quantization&#xff09;在优化性能的同时&#xff0c;减少了资源需求。这些技术不仅在当前的AI开发中至关重…...

linux在文件夹中查找文件内容

linux在文件夹中查找文件内容 在Linux中,可以通过以下多个途径,在文件夹中查找文件内容: 1、使用grep命令: grep -r "要查找的内容" /path/to/folder-r参数表示递归地在文件夹及其子文件夹中搜索。/path/to/folder是要搜索的文件夹路径。2、使用ack命令 ack …...

算法:11. 盛最多水的容器

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你…...

Hazelcast 分布式缓存 在Seatunnel中的使用

1、背景 最近在调研seatunnel的时候&#xff0c;发现新版的seatunnel提供了一个web服务&#xff0c;可以用于图形化的创建数据同步任务&#xff0c;然后管理任务。这里面有个日志模块&#xff0c;可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据…...

分数限制下,选好专业还是选好学校?

目录 分数限制下&#xff0c;选好专业还是选好学校&#xff1f; 方向一&#xff1a;专业解析 1. 专业选择的重要性 2. 不同专业的优势与挑战 3. 个人专业选择经验分享 4. 实际场景下的“专业VS学校”选择方案 方向二&#xff1a;名校效应分析 1. 名校声誉与品牌效应 2…...

软件改为开机自启动

1.按键 win R,输入“shell:startup”命令, 然后就可以打开启动目录了&#xff0c;如下&#xff1a; 2.然后&#xff0c;把要开机启动的程序的图标拖进去即可。 参考&#xff1a;开机启动项如何设置...

集群down机的应急和恢复测试(非重做备机)

1. 集群的两台服务器的状态 实例 正常情况主备 ip 端口 node1 主机 192.168.6.6 9088 node2 备机 192.168.6.7 9088 2. 测试的步骤 down掉node1观察node2的状态在node2未自动切换的时候手动将node2调整为单机状态&#xff0c;模拟紧急使用模拟不紧急时&#xff0…...

【数据库系统概论复习】关系数据库与关系代数笔记

文章目录 基本概念数据库基本概念关系数据结构完整性约束 关系代数关系代数练习课堂练习 语法树 基本概念 数据库基本概念 DB 数据库&#xff0c; 为了存用户的各种数据&#xff0c;我们要建很多关系&#xff08;二维表&#xff09;&#xff0c;所以把相关的关系&#xff08;二…...

赛氪网受邀参加上海闵行区翻译协会年会,共探科技翻译创新之路

在科技飞速发展的时代背景下&#xff0c;翻译行业正面临着前所未有的机遇与挑战。作为连接高校、企业与社会的桥梁&#xff0c;赛氪网在推动翻译创新、促进学术交流方面展现出了独特的魅力。2024年6月9日&#xff0c;在华东师范大学外语学院举办的第十三届上海市闵行区翻译协会…...

项目管理进阶之EVM(挣值管理)

前言 项目管理进阶系列&#xff0c;终于有时间更新啦&#xff01;&#xff01;&#xff01;欢迎持续关注哦~ 上一节博主重点讲了一个环&#xff1a;PDCA&#xff0c;无论各行各业&#xff0c;上到航空航天、下到种地种菜&#xff0c;都离不开对质量的监督和改进。这个环既是一…...

PLSQL、Oracle以及客户端远程连接服务器笔记(仅供参考)

1.PLSQL参考链接&#xff1a; 全网最全最细的PLSQL下载、安装、配置、使用指南、问题解答&#xff0c;相关问题已汇总-CSDN博客文章浏览阅读2.9w次&#xff0c;点赞98次&#xff0c;收藏447次。双击之后&#xff0c;这里选择安装目录&#xff0c;你安装目录选的哪里&#xff0…...

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…...

【机器学习】基于顺序到顺序Transformer机器翻译

引言 1.1 序列到序列模型详解 序列到序列(Seq2Seq)模型是深度学习中处理序列数据转换问题的关键架构。在自然语言处理(NLP)任务中&#xff0c;如机器翻译、文本摘要和聊天机器人等&#xff0c;Seq2Seq模型能够高效地将输入序列转换为期望的输出序列。 模型架构&#xff1a; 编…...

TEA 加密的 Java 实现

import java.nio.ByteBuffer; import java.nio.ByteOrder;public class TeaUtils {private static final int DELTA 0x9E3779B9;private static final int ROUND 32;private static final String KEY "password";/*** 加密字符串&#xff0c;使用 TEA 加密算法*/p…...

鸿蒙开发电话服务:【@ohos.telephony.data (蜂窝数据)】

蜂窝数据 说明&#xff1a; 本模块首批接口从API version 7开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import data from ohos.telephony.data;data.getDefaultCellularDataSlotId getDefaultCellularDataSlotId(callback: Async…...

基于PSO算法的海陆空多栖无人机路径规划探索

PSO算法&#xff0c;空中机器人路径规划&#xff0c;无人机路径规划 海陆空多栖环境路径规划&#xff0c;考虑海洋和大气中的能源消耗不同&#xff0c;还原环境特性&#xff0c;粒子群PSO算法在如今科技飞速发展的时代&#xff0c;无人机的应用场景越发广泛&#xff0c;从简单的…...

《B4410 [GESP202509 一级] 金字塔》

题目背景 对应的选择、判断题&#xff1a;https://ti.luogu.com.cn/problemset/1189 题目描述 金字塔由 n 层石块垒成。从塔底向上&#xff0c;每层依次需要 nn,(n−1)(n−1),⋯,22,11 块石块。请问搭建金字塔总共需要多少块石块&#xff1f; 输入格式 一行&#xff0c;一…...

OpenClaw自动化测试:Qwen3-32B批量执行LeetCode题目

OpenClaw自动化测试&#xff1a;Qwen3-32B批量执行LeetCode题目 1. 为什么需要自动化编程能力测试 作为一名长期关注AI编程辅助工具的技术博主&#xff0c;我一直在寻找能够客观评估大模型编程能力的方法。传统的单次对话测试往往带有偶然性&#xff0c;无法系统性地反映模型…...

接近开关和光电开关接头:A编码M12一体式防水连接器规格解析

在工业自动化现场&#xff0c;接近开关与光电开关是应用最广泛的传感器。其标准接口——A编码M12一体式防水连接器&#xff08;预铸线缆型&#xff09;&#xff0c;通过统一的机械尺寸与电气定义&#xff0c;实现了传感器的即插即用与高可靠连接。一、规格标准与接口定义A编码M…...

5个必知技巧:让你的PT下载效率提升300%的浏览器插件指南

5个必知技巧&#xff1a;让你的PT下载效率提升300%的浏览器插件指南 【免费下载链接】PT-Plugin-Plus PT 助手 Plus&#xff0c;为 Microsoft Edge、Google Chrome、Firefox 浏览器插件&#xff08;Web Extensions&#xff09;&#xff0c;主要用于辅助下载 PT 站的种子。 项…...

新手入门:用快马平台生成第一个labelimg式图像标注demo

今天想和大家分享一个特别适合计算机视觉新手的小项目——用InsCode(快马)平台快速搭建一个简易版的图像标注工具。这个工具类似labelimg的核心功能&#xff0c;但更轻量级&#xff0c;能帮助理解数据标注的基本流程。 项目背景理解 图像标注是计算机视觉的基础环节&#xff0c…...

NaViL-9B效果实测:支持中英文混排表格图像的行列结构识别与内容提取

NaViL-9B效果实测&#xff1a;支持中英文混排表格图像的行列结构识别与内容提取 1. 模型介绍 NaViL-9B是新一代原生多模态大语言模型&#xff0c;专为处理复杂视觉-语言任务设计。与常规视觉模型不同&#xff0c;它不仅能够理解图片内容&#xff0c;还能精准解析表格、文档等…...

Ubuntu 24.04 环境实战:ROS 2 Kilted 实现 SLAM 建图与 Nav2 导航

一、构建地图 1、安装依赖 安装 slam_toolbox 算法库&#xff1a; sudo apt install ros-kilted-slam-toolbox安装 TurtleBot3 全套支持包&#xff1a; sudo apt install ros-kilted-turtlebot3*2、使用清华源 如果apt安装很慢&#xff0c;请先配置清华源&#xff1a; sud…...

[技术突破]M9A:构建《重返未来:1999》智能自动化解决方案

[技术突破]M9A&#xff1a;构建《重返未来&#xff1a;1999》智能自动化解决方案 【免费下载链接】M9A 1999 小助手 项目地址: https://gitcode.com/gh_mirrors/m9/M9A 实现游戏体验革新的技术价值 M9A作为专为《重返未来&#xff1a;1999》设计的智能自动化工具&#…...

后端架构师转型AI智能体架构师:3个月实战路径,收藏这份落地指南

如果你本身就是后端/全栈/架构师出身&#xff0c;这意味着你已经有了一套非常扎实的“确定性系统”的构建能力——分布式、高并发、数据库事务、系统稳定性&#xff0c;这些都是你的底牌。 而AI智能体恰恰是“不确定性系统”&#xff08;大模型&#xff09;与“确定性系统”&am…...