当前位置: 首页 > news >正文

六、RocketMQ发送事务消息

事务消息介绍

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

当主分支订单系统状态更新失败后,物流、积分、购物车系统都不应该接收到消息

事务消息的发送流程

使用普通消息是做不到的,因为他会直接将消息发送到topic中

而事务消息参考了两阶段提交的原理,

  1. 先把消息发送broker中
  2. 当消息发送成功后,会执行本地事务
  3. 通过本地事务的执行情况,返回一个状态
  4. 状态对应三种情况
    • LocalTransactionState.UNKNOW:需要broker调用发送端的回查机制
    • LocalTransactionState.COMMIT_MESSAGE:broker将消息发送到指定的topic,此时消费端可以接收到消息
    • LocalTransactionState.ROLLBACK_MESSAGE:broker丢弃消息,不发送到指定的topic,消费端接收不到消息

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述

@Test
public void sendTrans() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 创建事务消息发送客户端TransactionMQProducer transProducer = new TransactionMQProducer("test-trans-producer");transProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 指定回查事务消息时的线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});transProducer.setExecutorService(executorService);// 设置事务监听器transProducer.setTransactionListener(new TransactionListener() {// 执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(Thread.currentThread().getName() + ":执行本地事务");// 触发回查机制return LocalTransactionState.UNKNOW;}// 回查本地事务,如果执行本地事务返回UNKNOW状态或者生产者应用退出导致本地事务未提交任何状态@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println(Thread.currentThread().getName() + ":触发事务回查");// 提交事务return LocalTransactionState.COMMIT_MESSAGE;}});transProducer.start();Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes());// 发送事务消息SendResult send = transProducer.sendMessageInTransaction(message,null);System.out.println(send.getSendStatus());Thread.sleep(Integer.MAX_VALUE);
}

注:需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

相关文章:

六、RocketMQ发送事务消息

事务消息介绍 在一些对数据一致性有强需求的场景&#xff0c;可以用 Apache RocketMQ 事务消息来解决&#xff0c;从而保证上下游数据的一致性。 以电商交易场景为例&#xff0c;用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的…...

Node.js初体验

Node.js简介 node.js的运行环境 1.V8引擎对js代码进行解析与执行 2.内置API&#xff1a;fs、path、http...等&#xff0c;提供了一些能力&#xff0c;能够使得js调用这些API去做一些后端的事情 流程&#xff1a;我们在node.js的运行环境中编写待执行的JavaScript代码&#…...

激活函数理解

激活函数&#xff08;Activation Function&#xff09;是神经网络中的一种数学函数&#xff0c;它的作用是为神经元&#xff08;或人工神经元&#xff09;引入非线性特性&#xff0c;从而使神经网络能够学习和表示更复杂的函数。激活函数通常位于神经元的输出端&#xff0c;接收…...

【docker - 安装】windows 10 专业版 安装docker,以及 WSL kernel version too low 解决方案

一、开启 Hyper-V 二、下载 docker 三、安装 docker 四、问题 Stage 1&#xff1a;打开 powershell&#xff0c;并执行 Stage 2&#xff1a;下载Linux内核更新包&#xff0c;并安装 Stage 3&#xff1a;将 WSL 2 设置为默认版本 Stage 4&#xff1a;安装所选的 Linux 分…...

洛谷P1601

题目见&#xff1a;P1601 AB Problem&#xff08;高精&#xff09; - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 1. 问题分析 加法计算问题应该和在界面输出“Hello,world!”是一个难度级别&#xff0c;但是问题在于受限于原始数据类型的限制&#xff0c;无法进行大数据的精…...

Elasticsearch:使用 LangChain 对话链和 OpenAI 的聊天机器人

在此笔记本中&#xff0c;我们将构建一个聊天机器人&#xff0c;它可以回答有关自定义数据的问题&#xff0c;例如雇主的政策。 聊天机器人使用 LangChain 的 ConversationalRetrievalChain&#xff0c;具有以下功能&#xff1a; 用自然语言回答问题在 Elasticsearch 中运行混…...

铜死亡+机器学习+WGCNA+分型生信思路

今天给同学们分享一篇单基因泛癌免疫实验生信文章“IGF2BP3 overexpression predicts poor prognosis and correlates with immune infiltration in bladder cancer”&#xff0c;这篇文章于2023年2月3日发表在BMC Cancer期刊上&#xff0c;影响因子为3.8。 膀胱癌是全球最常见…...

GB28181平台简介

产品简介 LiveMedia视频中间件是支持部署到本地服务器或者云服务器的纯软件服务&#xff0c;也提供服务器、GPU一体机全包服务&#xff0c;提供视频设备管理、无插件、跨平台的实时视频、历史回放、语音对讲、设备控制等基础功能&#xff0c;支持视频协议有海康、大华私有协议…...

JVM基础:初识JVM

IDE&#xff1a;IntelliJ IDEA 2022.1.3 x64 操作系统&#xff1a;win10 x64 位 家庭版 文章目录 一、JVM是什么&#xff1f;二、JVM有哪些功能&#xff1f;2.1 解释和运行2.2 内存管理2.3 即时编译 三、有哪些常见的JVM&#xff1f;3.1 常见JVM3.2 Java虚拟机规范3.3 HotSpot的…...

至强服务器BIOS/UEFI驱动开发笔记

至强服务器BIOS/UEFI驱动开发笔记 驱动开发基础Hello UEFI Driver 项目选择项目位置初始化驱动代码文件结构驱动程序入口和基本功能导入AMI工程AMI平台Hello UEFI Driver 编译问题测试结果打印设备列表继续开发`HelloWorldSupported`函数依赖配置使用脚本编译编译测试此DXE驱动…...

Linux:Termius连接本地虚拟机与虚拟机快照

Termius连接本地虚拟机与虚拟机快照 1. Termius连接本地虚拟机2. 虚拟机快照与还原2.1 设置快照以及恢复 附录 1. Termius连接本地虚拟机 ifconfig -a 查看配置 连接成功 2. 虚拟机快照与还原 在学习阶段我们无法避免的可能损坏Linux操作系统。 如果损坏的话&#xff0c;重新…...

高校教务系统登录页面JS分析——四川大学

高校教务系统密码加密逻辑及JS逆向 本文将介绍高校教务系统的密码加密逻辑以及使用JavaScript进行逆向分析的过程。通过本文&#xff0c;你将了解到密码加密的基本概念、常用加密算法以及如何通过逆向分析来破解密码。 本文仅供交流学习&#xff0c;勿用于非法用途。 一、密码加…...

Kafka SASL认证授权(四)认证源码解析

Kafka SASL认证授权(四)认证源码解析。 官网地址:https://kafka.apache.org/ 一、认证流程 在了解kafka网络模型的基础上,了解它的认证流程: ApiVersionsRequest->SaslHandshakeRequest->a series of SASL client and server tokens corresponding to the mechani…...

软件测试学习(一)基础概念、实质、说明书测试、分类、动态黑盒测试

目录 软件测试概念、背景 软件测试员究竟做些什么 大多数软件测试员应该具备的素质 软件测试的实质 完全测试程序是不可能的 测试无法显示潜伏的软件缺陷 并非所有软件缺陷都要修复 软件测试员在产品小组中不受欢迎 术语&#xff1a;精准和准确 产品说明书的测试技术…...

在fastapi中实现异步

在FastAPI应用中使用异步特性可以提高并发性能&#xff0c;但如果您要调用的模型是同步的&#xff0c;可能会导致阻塞。为了实现异步处理&#xff0c;您可以将阻塞的操作委托给线程池或进程池&#xff0c;以便异步执行。 以下是一种基本方法来实现异步处理图片识别任务&#x…...

js数组去重

在JavaScript中&#xff0c;有很多方法可以用来去除数组中的重复项。以下是一些常见的方法&#xff1a; 方法一&#xff1a;使用Set Set是ES6中的新数据类型&#xff0c;它只存储唯一值。因此&#xff0c;我们可以利用这一特性来去重。 let array [1, 2, 3, 2, 1, 4, 3, 5,…...

【前端】根据后端返回的url进行下载并设置文件下载名称

在我们项目当中存储文件是存储到厂商的服务器上的&#xff0c;然后厂商返回一个可以直接下载url地址&#xff0c;但是前端使用这个url下载的时候永远都是保存一个名字&#xff0c;这时候我们就需要设置文件保存的名称&#xff0c; 那么如何实现呢&#xff1f;使用了fet…...

《视觉SLAM十四讲》公式推导(一)

文章目录 CH3 三维空间刚体运动CH3-1 旋转矩阵的推导CH3-2 旋转矩阵是正交矩阵的证明CH3-3 变换矩阵的逆的推导CH3-4 罗德里格斯公式推导 CH3 三维空间刚体运动 CH3-1 旋转矩阵的推导 &#xff08;1&#xff09;二维空间中的旋转矩阵 易得 { x ′ ∣ O P ′ ∣ c o s ( θ …...

简单好用的解压缩软件:keka 中文 for mac

Keka是一款功能全面、易于使用的文件压缩和解压缩软件&#xff0c;为Mac用户提供了便捷的文件管理工具。它支持多种压缩格式&#xff0c;具有快速解压和强大的压缩功能&#xff0c;让您能够轻松地处理各种文件压缩需求。 隐私非常重要 安全共享只需设置密码并创建高度加密的文…...

【UE 插件】UE4 虚幻引擎 插件开发(带源码插件打包、无源码插件打包) 有这一篇文章就够了!!!

目录 0 引言1 快速入门1.1 新建插件的前提1.2 创建插件步骤1.3 打包插件 2 无源代码的插件制作3 插件详细介绍3.1 插件的使用方法3.1 UE 预置插件模版3.1.1 空白3.1.2 纯内容3.1.3 编辑器独立窗口3.1.4 编辑器工具栏按钮3.1.5 编辑器模式3.1.6 第三方库3.1.7 蓝图库 3.2 插件中…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...