当前位置: 首页 > news >正文

RocketMQ 事务消息

事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景功能原理实战例子三个模块慢慢为你揭开事务消息的神秘面纱。

1 应用场景

举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。

用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。

通常我们会使用普通消费方案,该方案能够发挥 MQ 的优势:异步解耦 ,  同时架构设计非常简单。

图片

  1. 用户购物车结算时,系统创建支付订单;

  2. 支付成功后,更新订单的状态从未支付修改为支付成功;

  3. 发送一条普通消息到消息队列服务端;

  4. 积分服务消费消息,添加积分记录。

但该方案有个非常直观的缺点:容易出现不一致的现象

  1. 假如先发送消息,后修改订单状态,消息发送成功,订单没有执行成功,需要回滚整个事务(订单数据事务回滚,积分服务消费时,需要先反查事务状态,若事务提交,才能插入积分记录)。

  2. 假如先修改订单状态,后发送消息,订单状态修改成功,但消息发送失败,需要补偿操作才能保持最终一致。

  3. 假如先修改订单,后发送消息,订单状态修改成功,但消息发送超时,此时无法判断需要回滚订单还是提交订单变更。

我们看到,为了完善普通消费方案,业务层还需要做到两点:补偿机制提供事务状态查询接口

要做到这两点,难不难呢?

不难,但是业务层代码会比较混乱,更优的方案还是得从中间件层面解决。

2 功能原理

RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。交互流程如下图所示:

图片

1、生产者将消息发送至 Broker 。

2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息

3、生产者开始执行本地事务逻辑

4、生产者根据本地事务执行结果向服务端提交二次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下:

  • 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  2. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

笔者认为事务消息的精髓在于:

  1. 本地事务执行成功,消费者才能消费事务消息

  2. 消息回查本身就是补偿机制的实现,事务生产者需提供了事务状态查询接口。

3 实战例子

为了便于大家理解事务消息 ,笔者新建一个工程用于模拟支付订单创建支付成功赠送积分的流程。

首先,我们创建一个真实的订单主题:order-topic 。

图片

然后在数据库中创建三张表 订单表事务日志表积分表

图片

最后我们创建一个 Demo 工程,生产者模块用于创建支付订单、修改支付订单成功,消费者模块用于新增积分记录。

图片

接下来,我们展示事务消息的实现流程。

1、创建支付订单

调用订单生产者服务创建订单接口 ,在 t_order 表中插入一条支付订单记录。

图片

2、调用生产者服务修改订单状态接口

接口的逻辑就是执行事务生产者的 sendMessageInTransaction  方法。

图片

生产者端需要配置事务生产者事务监听器

图片

发送事务消息的方法内部包含三个步骤 :

图片

事务生产者首先发送半事务消息,发送成功后,生产者才开始执行本地事务逻辑

事务监听器实现了两个功能:执行本地事务供 Broker 回查事务状态 。

图片

执行本地事务的逻辑内部就是执行 orderService.updateOrder 方法。

方法执行成功则返回 LocalTransactionState.COMMIT_MESSAGE , 若执行失败则返回 LocalTransactionState.ROLLBACK_MESSAGE 。

图片

需要注意的是: orderService.updateOrder 方法添加了事务注解,并将修改订单状态和插入事务日志表放进一个事务内,避免订单状态和事务日志表的数据不一致。

最后,生产者根据本地事务执行结果向 Broker 提交二次确认结果

Broker 收到生产者确认结果后处理逻辑如下:

  • 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。

3、积分消费者消费消息,添加积分记录

当 Broker 将半事务消息标记为可投递时,积分消费者就可以开始消费主题 order-topic 的消息了。

图片

积分消费者服务,我们定义了消费者组名,以及订阅主题消费监听器

图片

在消费监听器逻辑里,幂等非常重要 。当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。

而且我们在创建积分表时,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。

4 总结

RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性

编写一个实战例子并不复杂,但使用事务消息时需要注意如下三点:

1、事务生产者和消费者共同协作才能保证业务数据的最终一致性;

2、事务生产者需要实现事务监听器,并且保存事务的执行结果(比如事务日志表) ;

3、消费者要保证幂等。消费失败时,通过重试告警+人工介入等手段保证消费结果正确。

相关文章:

RocketMQ 事务消息

事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景、功能原理、实战例子三个模块慢慢为你揭开事务消息的神秘面纱。 1 应用场景 举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。 用户支付成功后支付订单的状态…...

Windows安装ElasticSearch

安装环境:java环境。新版本需要安装高版本的java,所有本次安装的为 7.x版本的ElasticSearch 。所以要java11 1、安装java11 2、下载 Elasticsearch 安装包 官网地址:(https://www.elastic.co/cn/) 安装包下载地址:https://www…...

【深度学习】SMILEtrack: SiMIlarity LEarning for Multiple Object Tracking,论文

论文:https://arxiv.org/abs/2211.08824 代码:https://github.com/WWangYuHsiang/SMILEtrack 文章目录 AbstractIntroductionRelated WorkTracking-by-DetectionDetection methodData association method Tracking-by-Attention Methodology架构概述外观…...

【Kubernetes】Kubernetes之二进制部署

kubernetes 一、Kubernetes 的安装部署1. 常见的安装部署方式1.1 Minikube1.2 Kubeadm1.3 二进制安装部署2. K8S 部署 二进制与高可用的区别2.1 二进制部署2.2 kubeadm 部署二、Kubernetes 二进制部署过程1. 服务器相关设置以及架构2. 操作系统初始化配置3. 部署 etcd 集群4. 部…...

京东开源的、高效的企业级表格可视化搭建解决方案:DripTable

DripTable 是京东零售推出的一款用于企业级中后台的动态列表解决方案,项目基于 React 和 JSON Schema,旨在通过简单配置快速生成页面动态列表来降低列表开发难度、提高工作效率。 DripTable 目前包含以下子项目:drip-table、drip-table-gene…...

STL C++学习背景

STL C学习背景 背景知识 背景知识 STL前置知识 STL,英文全称 standard template library,中文可译为标准模板库或者泛型库,其包含有大量的模板类和模板函数,是 C 提供的一个基础模板的集合,用于完成诸如输入/输出、数…...

C#踩坑:谨慎在XML数据列上绑定鼠标事件!

按照计划,昨天晚上就完成最后的公式自动计算,程序的流程就算完整了,可以正常运行了,一般情况下,是可以完成的。 10点开始干,窗体上放置一个Treeview,然后针对XML对Treeview进行数据绑定&#xf…...

逻辑代数运算

逻辑代数运算中的三种基本运算 与(AND):只有满足全部条件,才会产生结果 或(OR) :只要满足一个条件,就会产生结果 非(NOT):只要满足条件&#xff…...

win10笔记本显示器根据页面显示亮度自动调节亮度的问题

系统是win10企业版,针对这个问题查了很多种方法,比如: 1、控制面板->硬件和声音->电源选项->点击当前电源计划的更改计划设置->更改高级电源设置->显示->启用自适应亮度 但是我发现我的电源计划只有平衡这一种&#xff0c…...

无人驾驶实战-第一课(自动驾驶概述)

在七月算法上报了《无人驾驶实战》课程,老师讲的真好。好记性不如烂笔头,记录一下学习内容。 课程入口,感兴趣的也可以跟着学一下。 ————————————————————————————————————————— 无人驾驶汽车的定义…...

15.节点操作

15.1 DOM节点 1.DOM节点 DOM树里每一个内容都称之为节点 2.节点类型 ●元素节点 所有的标签 比如body、div html是根节点 ●属性节点 所有的属性 比如href ●文本节点 所有的文本 15.2查找节点 1.父节点查找: parentNode属性 返回最近一级的父节点找不到返回为n…...

C语言自定义类型 — 结构体、位段、枚举、联合

前言 本期主要对通讯录三篇博客文章进行补充 通讯录文章:通讯录系列文章 对结构体进行详细介绍,其次讲解位段、枚举、联合体 文章目录 前言一、结构体1.什么是结构体2.结构声明2.1 声明格式2.2 如何声明(代码演示) 3.特殊声明3.1…...

新手指南:流程图中各种图形的含义及用法解析

我们经常在技术设计、沟通、业务演示等一些领域看到流程图,它也可以称为输入输出图。顾名思义,它是指一种简单的工作流程的具体步骤,比如包括一次会议的流程,以及一次生产制造的顺序和过程等。本文将为大家介绍流程图的含义和具体…...

【知识产权】专利的弊端

接上篇【知识产权】著作权的作用_qilei2010的博客-CSDN博客。 ​ 1 专利的分类 首先,专利分为:发明专利、实用新型专利、外观设计专利。这里要说明的是专利的不同种类在不同的国家都是有不同规定的,并不是所有国家和地区都是分成这三类。 >国家法律法规数据库 >中华…...

用Rust实现23种设计模式之抽象工厂

在 Rust 中,可以使用 trait 和泛型来实现抽象工厂模式。抽象工厂模式是一种创建型设计模式,它提供了一个接口来创建一系列相关或依赖对象的家族,而无需指定具体的类。下面是一个简单的示例,展示了如何使用 Rust 实现抽象工厂模式&…...

31.利用linprog 解决 投资问题(matlab程序)

1.简述 语法:[X,FVAL] linprog(f,a,b,Aeq,Beq,LB,UB,X0); X 为最终解 , FVAL为最终解对应的函数值 *注意:求最大值时,结果FVAL需要取反* f 为决策函数的系数矩阵。 *注意:当所求为最大值…...

整数线性规划求解工具isl使用方法

整数线性规划求解工具 Integer Set Library 代码 参考 isl是一个用于多面体模型调度实现的c/c库。通过isl,我们可以对模型进行自动的调度,循环优化等。 编译 ISL,Integer Set Library 版本0.22.1,http://isl.gforge.inria.fr/ README关于…...

SystemC的调度器

文章目录 前言调度器初始化evaluatewait updatenotify delta notificationtime notification仿真结束 前言 SystemC是基于C的库,主要用来对 IC 进行功能建模和性能建模。有时也被用来当做 RTL (register transfer level) 级的升级版 HLS(High Level synthesis) 直接…...

SpringBoot、SpringCloud 版本查看

1、SpringBoot 官网地址 https://spring.io/projects/spring-boot#learn spring-boot-starter-parent 版本列表可查看: https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent 2、SpringCloud 官网地址 https://spring.io/pro…...

AI Chat 设计模式:12. 享元模式

本文是该系列的第十二篇,采用问答式的方式展开,问题由我提出,答案由 Chat AI 作出,灰色背景的文字则主要是我的一些思考和补充。 问题列表 Q.1 给我介绍一下享元模式A.1Q.2 也就是说,其实共享的是对象的内部状态&…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

Netty从入门到进阶(二)

二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架&#xff0c;用于…...

Python 高效图像帧提取与视频编码:实战指南

Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...

消息队列系统设计与实践全解析

文章目录 &#x1f680; 消息队列系统设计与实践全解析&#x1f50d; 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡&#x1f4a1; 权衡决策框架 1.3 运维复杂度评估&#x1f527; 运维成本降低策略 &#x1f3d7;️ 二、典型架构设计2.1 分布式事务最终一致…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...