当前位置: 首页 > article >正文

【RocketMQ 生产者和消费者】- 事务消息的使用

本文章基于 RocketMQ 4.9.31. 前言【RocketMQ】- 源码系列目录【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息【RocketMQ 生产者和消费者】- 消费者启动源码【RocketMQ 生产者和消费者】- 消费者重平衡1【RocketMQ 生产者和消费者】- 消费者重平衡2- 分配策略【RocketMQ 生产者和消费者】- 消费者重平衡3- 消费者 ID 对负载均衡的影响【RocketMQ 生产者和消费者】- 消费者的订阅关系一致性【RocketMQ 生产者和消费者】- 消费者发起消息拉取请求 PullMessageService【RocketMQ 生产者和消费者】- broker 是如何处理消费者消息拉取的 Netty 请求的【RocketMQ 生产者和消费者】- broker 处理消息拉取请求【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果【RocketMQ 生产者和消费者】- 消费者处理消息拉取结果【RocketMQ 生产者和消费者】- ConsumeMessageConcurrentlyService 并发消费消息【RocketMQ 生产者和消费者】- ConsumeMessageOrderlyService 顺序消费消息【RocketMQ 生产者和消费者】- sendMessageBack 发送重试消息【RocketMQ 生产者和消费者】- 延时消息的使用【RocketMQ 生产者和消费者】- 延时消息原理解析-ScheduleMessageService上两篇文章我们分析了延时消息的基本使用和原理这篇文章就来看下事务消息的基本使用。2. 事务消息首先介绍下什么是事务消息也可以看官网的介绍事务消息发送。事务消息可以用在消息队列中确保消息传递和业务操作原子性、一致性尤其适用于分布式系统环境特别是消息上下游服务相互依赖的场景。比如下面的两个场景订单支付比如上游支付订单之后扣减本地库存更新订单状态接着通知下游服务比如物流系统做后续的处理这种情况下可以先发送一条事务消息然后执行本地事务逻辑扣减库存更新订单状态做完之后根据本地事务消息的结果决定这条消息能不能被下游的服务看到如果本地事务执行失败那么下游也不需要继续处理。金融系统中的转账业务转账同时扣减转出账户余额和增加转入账户余额。用事务消息在发送转账成功消息之前执行本地账户余额更新事务确保消息发送与账户余额变更操作的一致性。上面说的本地事务可以是操作数据库、操作缓存等等反正就是一个方法根据业务来处理下面就是大致的流程这个图也是直接用官方的图了。上面涉及到几个名词下面解释下半事务消息半事务消息是指暂不能投递的消息当生产者发送半事务消息到 Broker后Broker 收到消息会将其先存储起来但不会马上将该消息投递给消费者此时消费者对这条消息不可见。只有生产者对该消息进行二次确认即提交或回滚操作后Broker 才会根据确认结果进行相应处理。本地事务本地事务就是本地的逻辑当生产者执行完本地事务之后根据执行结果返回对应的状态码broker 会根据对应结果来处理这条半事务消息。本地事务有三种状态UNKNOW表示中间状态代表需要通过事务回查来确定最终的执行结果COMMIT_MESSAGE表示本地事务执行成功此消息可以被消费者消费ROLLBACK_MESSAGE表示事务回滚这条消息会被删掉不会被消费者消费。事务回查如果由于网络闪断、生产者应用重启等原因导致某条事务消息的二次确认丢失Broker 端会通过扫描发现某条消息长期处于“半事务消息”时需要主动向消息生产者询问该消息的最终状态Commit 或是 Rollback如果业务阻塞也可以通过这种方式给一个兜底的结果在 broker.conf 可以配置transactionCheckInterval回查的时间间隔根据自己实际业务决定。下面把整个流程串起来生产者将半事务消息发送至 broker。broker 将这条消息持久化这条消息属于半事务消息还不可被消费者消费。生产者通过executeLocalTransaction开始执行本地事务逻辑。本地事务执行完成后生产者向 broker 提交二次确认结果Commit 或是 Rollback服务端走不同的处理逻辑二次确认结果为COMMIT_MESSAGE服务端将半事务消息标记为可投递并投递给消费者。二次确认结果为ROLLBACK_MESSAGE服务端将回滚事务不会将半事务消息投递给消费者。二次确认结果为UNKNOW服务端将回滚事务不会将半事务消息投递给消费者。在断网或者是生产者应用重启的特殊情况下broker 迟迟收不到事务的二次提交结果又或者如果事务提交结果给的是UNKNOW那么服务端会对生产者发起事务回查。为了避免单个消息被检查太多次而导致半队列消息累积默认将单个消息的检查次数限制为 15 次但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改检查次数。如果回查次数到达上限了那么 broker 就丢掉这条消息然后在默认情况下同时打印错误日志。不过我们可以通过重写AbstractTransactionalMessageCheckListener、 类来修改回查上限之后要做什么。生产者收到消息回查后通过checkLocalTransaction检查对应消息的本地事务执行的最终结果。根据检查出来的结果进行二次提交这时候又回到了 4 的逻辑。3. 示例下面看下事务消息的使用首先在 broker 配置下消息回查的时间。transactionCheckInterval20000然后设置下生产者生产者一共发送 10 条消息下标从 0 开始计算然后每一条消息发送的 tag 不一样。publicclassTransactionProducer{publicstaticvoidmain(String[]args)throwsMQClientException,InterruptedException{TransactionListenertransactionListenernewTransactionListenerImpl();// 事务消息生产者TransactionMQProducerproducernewTransactionMQProducer(transactionMQProducer);// 执行本地回查的线程池ExecutorServiceexecutorServicenewThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,newArrayBlockingQueueRunnable(2000),newThreadFactory(){OverridepublicThreadnewThread(Runnabler){ThreadthreadnewThread(r);thread.setName(client-transaction-msg-check-thread);returnthread;}});producer.setNamesrvAddr(localhost:9876);producer.setExecutorService(executorService);// 设置事务监听器producer.setTransactionListener(transactionListener);// 设置本地事务回查时间间隔producer.start();String[]tagsnewString[]{TagA,TagB,TagC,TagD,TagE};for(inti0;i10;i){try{// 消息发送MessagemsgnewMessage(TopicTest1234,tags[i%tags.length],KEYi,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResultsendResultproducer.sendMessageInTransaction(msg,null);System.out.printf(now() %s%n,sendResult);Thread.sleep(10);}catch(MQClientException|UnsupportedEncodingExceptione){e.printStackTrace();}}for(inti0;i100000;i){Thread.sleep(1000);}producer.shutdown();}}接下来初始化事务监听器TransactionListenerImpl实现 TransactionListener 接口在里面定义executeLocalTransaction本地事务执行的逻辑以及checkLocalTransaction本地事务回查的逻辑。publicclassTransactionListenerImplimplementsTransactionListener{privateAtomicIntegertransactionIndexnewAtomicInteger(0);privateConcurrentHashMapString,IntegerlocalTransnewConcurrentHashMap();/** * 执行本地事务 * param msg Half(prepare) message * param arg Custom business parameter * return */OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){// 执行本地事务, 根据不同下标在下面的 checkLocalTransaction 方法走不同的逻辑intvaluetransactionIndex.getAndIncrement();intstatusvalue%3;localTrans.put(msg.getTransactionId(),status);// 全部返回 UNKNOW, 意味者要通过事务回查 checkLocalTransaction 方法去检查本地事务的提交结果returnLocalTransactionState.UNKNOW;}/** * 本地事务回查 * param msg Check message * return */OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){System.out.println(now() checkLocalTransaction 执行本地事务回查, 当前消息事务 ID: msg.getTransactionId());IntegerstatuslocalTrans.get(msg.getTransactionId());if(null!status){switch(status){case0:returnLocalTransactionState.UNKNOW;case1:returnLocalTransactionState.COMMIT_MESSAGE;case2:returnLocalTransactionState.ROLLBACK_MESSAGE;default:returnLocalTransactionState.COMMIT_MESSAGE;}}returnLocalTransactionState.COMMIT_MESSAGE;}}executeLocalTransaction执行本地事务的时候会先通过 transactionIndex 1 算出这条消息是哪个下标然后在事务回查checkLocalTransaction的时候将所有消息分为三类来处理。0LocalTransactionState.UNKNOW1LocalTransactionState.COMMIT_MESSAGE2LocalTransactionState.ROLLBACK_MESSAGE大致流程就是执行本地事务的时候LocalTransactionState.UNKNOWbroker 会对每一条消息都进行回查然后在回查里面会根据不同的下标来返回不同的结果结合 executeLocalTransaction 方法可以大概看出消息0、3、6、9会不断回查1、4、7回查之后发现是COMMIT_MESSAGE消息提交消费者可以消费到这几条消息2、5、8回查之后是ROLLBACK_MESSAGE这几条消息会被丢掉后续回查也不会对这几条消息发起回查了。然后是消费者消费者比较简单就是订阅TopicTest1234下面的所有 tags 进行消费。publicclassTransactionConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(testGroupConsumer);consumer.setNamesrvAddr(localhost:9876);consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe(TopicTest1234,*);consumer.registerMessageListener(newMessageListenerConcurrently(){OverridepublicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){for(inti0;imsgs.size();i){MessageExtmsgmsgs.get(i);System.out.printf(%s, %s Receive New Messages: %s %n,now(),Thread.currentThread().getName(),newString(msg.getBody(),StandardCharsets.UTF_8));context.setAckIndex(i);}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}publicstaticStringnow(){returnnewSimpleDateFormat(yyyy-MM-dd HH:mm:ss:SSS).format(newDate());}}输出结果如下首先是消费者可以看到确实只消费了 1、4、7 三条消息。然后是生产者生产者发送消息之后由于本地事务执行都是返回的 LocalTransactionState.UNKNOW所以经过一定时间后会消息回查这里为什么不是 20s后面源码会分析。经过事务回查之后发现2、5、8返回了ROLLBACK_MESSAGE于是后续不会再回查这几条消息而0、3、6、9由于还是返回UNKNOW于是就一直不断回查但是回查次数有上限所以也不会一直回查不过时间太长了就不贴出所有输出。4. 使用限制这里的限制也是代码给的文档里面标注出来的下面贴出来事务消息不支持延时消息和批量消息。提交给用户的目标主题消息可能会失败它的高可用性通过 RocketMQ 本身的高可用性机制来保证如果希望确保事务消息不丢失、并且事务完整性得到保证建议使用同步的双重写入机制。事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。5. 小结好了到这里事务消息的使用和大致流程已经介绍完下一篇文章就要进入源码分析环节。如有错误欢迎指出

相关文章:

【RocketMQ 生产者和消费者】- 事务消息的使用

本文章基于 RocketMQ 4.9.3 1. 前言 【RocketMQ】- 源码系列目录【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息【RocketMQ 生产者和消费者】- 消费者启动源码【RocketMQ 生产者和消费者】- 消费者重平衡(1)【RocketMQ 生产者和消费者】- 消…...

CM311-1a机顶盒system分区只读?3种方法教你强制卸载并删除预装应用

CM311-1a机顶盒system分区只读?3种方法教你强制卸载并删除预装应用 手头这台CM311-1a机顶盒,开机后满屏的运营商应用和广告推送,用起来实在不够清爽。相信不少折腾过这类盒子的朋友都动过删除预装软件的念头,但当你兴致勃勃地连接…...

Linux运维进阶指南:从RHCSA到RHCA,如何规划你的红帽认证之路?

Linux运维进阶指南:从RHCSA到RHCA,如何规划你的红帽认证之路? 很多朋友在掌握了Linux基础操作后,会感到一丝迷茫。日常的服务器维护、脚本编写似乎已经得心应手,但职业的天花板也隐约可见。下一步该往哪里走&#xff…...

Deepin系统远程办公实战:用xrdp实现Windows无缝连接(含密码安全配置技巧)

Deepin系统远程办公实战:用xrdp实现Windows无缝连接(含密码安全配置技巧) 在混合办公与分布式团队日益普及的今天,拥有一套稳定、安全且高效的远程桌面解决方案,已成为许多职场人士和技术爱好者的刚需。如果你恰好是De…...

五、BGP路由优化与实战配置指南

1. 为什么你的BGP网络总是不稳?从理解路由优化开始 搞网络的朋友,尤其是负责中大型数据中心或者跨地域骨干网的,估计没少被BGP折腾过。我见过太多这样的场景:网络平时看着好好的,流量一上来就抖,或者某个链…...

MacOS高效配置FFmpeg与FFprobe的完整指南

1. 为什么你的FFmpeg安装总是失败?先避开这些坑 如果你在Mac上折腾过FFmpeg,大概率经历过这样的场景:跟着网上某个教程,一通操作猛如虎,最后在终端里输入 ffmpeg -version,结果给你来一句“command not fou…...

Superset动态参数图表开发手册:手把手教你处理多值IN查询和日期断层问题

Superset动态参数图表开发手册:手把手教你处理多值IN查询和日期断层问题 你是否曾为在Superset中实现一个看似简单的动态筛选图表而焦头烂额?当业务方提出“我们需要一个能同时筛选多个部门、并且日期轴要连续不间断的报表”时,你信心满满地打…...

利用 Cloudflare CDN 代理,打通 IPv4 访问 IPv6 服务的网络鸿沟

1. 从一次真实的访问困境说起 前几天,我的一位朋友,一位资深开发者,在群里发了个哭笑不得的表情。他在自己家里,用一台旧电脑折腾了个私人网盘(NAS),还搭了个博客,图的就是个自由和…...

浏览器提示“代理服务器可能有问题”?三步排查法帮你快速解决

1. 问题初现:当浏览器突然“罢工” 相信不少朋友都遇到过这种情况:正想打开浏览器查个资料、看个视频,结果页面没刷出来,反而弹出一个让人心头一紧的提示——“代理服务器可能有问题”。那一瞬间,感觉就像开车时突然亮…...

基于龙芯2K0300久久派的OpenCV交叉编译实战:从虚拟机Ubuntu环境搭建到嵌入式视觉应用部署

1. 环境准备:虚拟机与Ubuntu的“新家”搭建 如果你正准备为龙芯2K0300久久派折腾OpenCV,那第一步绝对不是急着敲命令。我见过太多新手朋友,一上来就照着教程安装工具链,结果卡在奇奇怪怪的环境问题上,白白浪费一两天时…...

解锁Minio原生分片上传:从源码解析到实战封装

1. 为什么你需要Minio的原生分片上传? 如果你正在处理大文件上传,比如用户上传的视频、设计稿源文件,或者系统间的数据备份包,那你肯定遇到过这些问题:上传到一半网络断了,得全部重来;或者一个几…...

用VirtualBox快速搭建麒麟信安3.3-6C测试环境:附网络隔离方案与权限管理技巧

用VirtualBox快速搭建麒麟信安3.3-6C测试环境:附网络隔离方案与权限管理技巧 最近在折腾几个安全相关的测试项目,需要一个既能模拟内网环境、又能方便访问外部资源进行软件包更新的沙箱。物理机来回折腾太麻烦,云主机又不够“隔离”&#xff…...

主流人群计数数据集深度解析:从ShanghaiTech到JHU_CROWD++

1. 人群计数数据集:为什么选对数据集,你的模型就成功了一半? 刚入行人脸检测或者人群计数的时候,我踩过最大的一个坑,就是没把数据集研究明白。当时拿到一个开源模型,兴冲冲地用自己的几张图跑了一下&#…...

Mac用户福音:无需Root实现Android屏幕共享与远程控制的完整指南(附常见问题解决)

Mac用户福音:无需Root实现Android屏幕共享与远程控制的完整指南(附常见问题解决) 作为一名长期在Mac生态下工作的开发者或效率追求者,你是否曾为无法在Mac电脑上流畅地查看和控制Android手机屏幕而烦恼?无论是为了演示…...

ReDoc 实战:打造企业级 API 文档的进阶技巧与最佳实践

1. 为什么企业级项目需要 ReDoc?不止是“好看”那么简单 很多朋友第一次接触 ReDoc,可能和我当初一样,觉得它就是个“美化版”的 Swagger UI。确实,它三栏式的布局、清晰的排版,一眼看上去就比 Swagger UI 专业不少。但…...

open3d 结合VSCode与SSH实现远程服务器3D可视化界面本地渲染

1. 为什么我们需要远程3D可视化? 搞3D点云、三维重建或者计算机视觉的朋友,肯定都遇到过这个场景:代码和模型都跑在实验室或者公司的远程服务器上,那机器性能强劲,GPU给力,但就是没有显示器。你想看一眼自己…...

你的服务还在用HTTP轮询?一文搞懂Kafka——从零到百万级吞吐的C++实战

一、你的轮询,正在杀死你的服务器 想象一个场景:你写了一个C++后端服务,前端每隔500毫秒发一次HTTP请求来问"有没有新消息?“。大部分时候服务端回答"没有”,偶尔回一条。系统跑了半年没出过问题。 然后用户量翻了10倍。 你开始发现CPU占用莫名其妙地飙到70%…...

从传统到深度学习:图像分割算法的演进与应用场景解析

1. 图像分割:从“看”到“理解”的关键一步 想象一下,你给电脑看一张照片,它不仅能认出照片里有一只猫,还能精确地告诉你猫的轮廓在哪里,猫的眼睛、鼻子、耳朵分别属于图像的哪些像素。这个过程,就是图像分…...

全方位抓包实战指南:从浏览器到小程序的完整解决方案

1. 为什么你需要掌握全平台抓包? 作为一名和网络请求打了十几年交道的“老司机”,我见过太多开发者朋友在调试问题时,面对浏览器、手机APP、微信小程序或者一个独立的PC桌面应用,不知道如何下手去查看它们背后到底在和服务器“聊”…...

PyBullet实战:从零开始构建你的第一个机器人仿真环境

1. 环境准备:安装与初识PyBullet 想玩机器人仿真,但又觉得那些软件门槛太高?别担心,PyBullet就是为你准备的。我第一次接触它的时候,感觉就像发现了一个宝藏。它本质上是一个Python模块,把强大的Bullet物理…...

ASPP模块的深度解析:从多尺度感知到语义分割的实践应用

1. 为什么你的语义分割模型总“看不清”?聊聊多尺度感知的痛点 做语义分割的朋友,估计都遇到过这样的尴尬:模型对远处的小车识别得挺好,但画面里那棵近在眼前的大树,却死活分不清是树还是电线杆;又或者&…...

如何快速检测和修复BSPHP未授权访问漏洞?安全工程师的实用指南

从实战出发:BSPHP未授权访问漏洞的深度检测与根治方案 最近在帮一家电商平台做安全审计时,他们的技术负责人一脸愁容地找到我,说内部监控发现有几个奇怪的IP在频繁访问管理后台的日志接口,但查了登录记录却没有任何异常。我们花了…...

【SMB协议】Win10访问Linux共享文件夹:从“不安全的来宾登录”到用户映射的实战排障

1. 从“能ping通”到“打不开”:一个混合办公环境的真实困境 最近在帮一个朋友的公司搭建内部文件共享系统,他们有几台Windows 10的办公电脑,需要稳定地访问一台运行Ubuntu的服务器上的共享文件夹。听起来是个很常规的需求对吧?我…...

从MicroPython到C/C++:树莓派Pico双语言开发实战对比

从MicroPython到C/C:树莓派Pico双语言开发实战对比 如果你手头有一块树莓派Pico,面对MicroPython和C/C两种开发方式,是不是有点选择困难?我刚开始接触Pico的时候也纠结过,毕竟两种语言各有各的吸引力。MicroPython上手…...

为什么你的 SQL 测试快生产卡?金仓连接条件下推来解答

你是否遇到过这样的场景:一个看似复杂的SQL,在测试环境运行飞快,一到生产环境就“卡死”,一查执行计划,发现子查询生成了一个巨大的中间结果集,导致后续操作全部陷入性能泥潭? 如果你正被此类场…...

sd工具终极发展蓝图:从简单替换到智能编辑的完整进化指南

sd工具终极发展蓝图:从简单替换到智能编辑的完整进化指南 【免费下载链接】sd Intuitive find & replace CLI (sed alternative) 项目地址: https://gitcode.com/gh_mirrors/sd/sd 在现代开发工作流中,高效的文本处理工具是提升 productivity…...

终极指南:7个最适合用sd处理的真实案例解析

终极指南:7个最适合用sd处理的真实案例解析 【免费下载链接】sd Intuitive find & replace CLI (sed alternative) 项目地址: https://gitcode.com/gh_mirrors/sd/sd sd是一款直观的查找替换命令行工具,专为简化文本处理任务而设计。它采用Ja…...

AppManager Root功能终极指南:解锁Android系统的全部潜力

AppManager Root功能终极指南:解锁Android系统的全部潜力 【免费下载链接】AppManager A full-featured package manager and viewer for Android 项目地址: https://gitcode.com/gh_mirrors/ap/AppManager AppManager是一款功能全面的Android软件包管理器和…...

sd安装终极指南:5种快速安装方法让你告别sed复杂语法

sd安装终极指南:5种快速安装方法让你告别sed复杂语法 【免费下载链接】sd Intuitive find & replace CLI (sed alternative) 项目地址: https://gitcode.com/gh_mirrors/sd/sd sd是一款直观的命令行查找替换工具,作为sed的替代品,…...

Agones性能优化终极指南:10个技巧提升游戏服务器响应速度和吞吐量

Agones性能优化终极指南:10个技巧提升游戏服务器响应速度和吞吐量 【免费下载链接】agones Dedicated Game Server Hosting and Scaling for Multiplayer Games on Kubernetes 项目地址: https://gitcode.com/gh_mirrors/ag/agones Agones是专为Kubernetes设…...