RocketMQ(3)之事务消息
一、发送事务消息案例
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
1.1创建事务性生产者
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。
/*** 发送事务消息* @throws Exception*/@Testpublic void testTransactionProduce() throws Exception {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
2.事务监听接口
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();/*** 本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}/*** 状态回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
1.3事务消息使用上的限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionalMessageCheckListener类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionTimeout参数。 - 事务性消息可能不止一次被检查或消费
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
相关文章:
RocketMQ(3)之事务消息
一、发送事务消息案例 事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除…...
基于多设计模式下的同步异步日志系统
基于多设计模式下的同步&异步日志系统 代码链接:https://github.com/Janonez/Log_System 1. 项目介绍 本项目主要实现一个日志系统, 其主要支持以下功能: 支持多级别日志消息支持同步日志和异步日志支持可靠写入日志到标准输出、文件…...
API接口与电商平台之间的联系,采集京东平台数据按关键字搜索商品接口示例
关键字搜索商品的重要性: 1.引入精准流量 关键词第一个也是最重要的作用就是为我们宝贝引进精准的流量,这一作用无论是在自然搜索中还是直通车中都是一样的。 第一步关乎的是我们宝贝的展现,而第二步用户是否会点进我们的宝贝,…...
代码随想录day41|343. 整数拆分96. 不同的二叉搜索树
343. 整数拆分 class Solution:def integerBreak(self, n: int) -> int:dp [0] *(n1)dp[2]1if n <3:return dp[n]for i in range(3,n1):for j in range(1,n):dp[i]max(j*(i-j),j*dp[i-j],dp[i])return dp[n] 96. 不同的二叉搜索树 class Solution:def numTrees(self, …...
Less常用内置函数
1,类型函数 isnumber(value) - 判断是否为数字isstring(value) - 判断是否为字符串isurl(value) - 判断是否为urliscolor(value) - 判断是否为颜色isunit(value, unit) - 判断value值是否为指定单位 示例: isnumber(12); // true isnumber(#333); // f…...
pdf转换成图片转换器在线怎么转?pdf转换成图片具体方法介绍
很多用户们都是比较喜欢使用pdf文档的,由于这种文件格式的便携性非常高,所以广泛的应用于工作和学习领域,再加上pdf文档可以随意转换成为其他的文件格式,更是让pdf文档受到了更多用户们的欢迎,那么pdf转换成图片转换器…...
JavaScript动态设置浏览器可视区域元素的文字颜色、监听滚动条、querySelectorAll、getBoundingClientRect
文章目录 前言htmlJavaScriptquerySelectorAllgetBoundingClientRect 前言 当元素出现在浏览器可视区域时给元素设置颜色等其他操作,比如当元素进入浏览器可视区域时,设置元素进入动画。 html <div id"idBox" class"box"><…...
意向客户的信息获取到底是怎样的,快来get一下
客户信息获取技术真的可以为企业提供精准客源吗?这个渠道到底安不安全,技术到底成不成熟?效果到底如何?下面简单的和大家分析一下。 客户信息获取技术是怎样的 手机采集引流方面,上量不精准,精准不上量的说…...
自动化测试常用脚本语言有哪些?
在自动化测试中,常用的脚本语言包括: 1. Python:Python是一个简洁、易读且功能强大的脚本语言,广泛应用于自动化测试领域。它具有丰富的测试框架和库,可以用于Web、移动应用和API等各种类型的测试。 2. Java࿱…...
mapreduce 的工作原理以及 hdfs 上传文件的流程
推荐两篇博文 mapreduce 的工作原理: 图文详解 MapReduce 工作流程_mapreduce工作流程_Shockang的博客-CSDN博客 hdfs 上传文件的流程 HDFS原理 - 知乎...
Ubuntu22.04安装ROS2
Ubuntu22.04安装ROS2 Excerpt ROS2官方文档 ROS2清华镜像站sudo apt update sudo apt upgrade locale # check for UTF-8 sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 sudo update-locale LC_ALLe… ROS2官方文档 ROS2清华镜像站…...
uniapp - 倒计时组件-优化循环时间倒计时
使用定时器的规避方法 为了避免定时器误差导致倒计时计算错误,可以采用一些规避方法,比如将倒计时被中断时的剩余时间记录下来,重新开启定时器时再将这个剩余时间加到新的计算中。同时,为了避免定时器延迟,可以在每次执…...
java 实现访问者模式
访问者模式是一种行为设计模式,它允许您在不修改对象结构的情况下,向对象结构中的元素添加新的操作。这通常用于解决对象结构中元素类型多变,但操作类型相对稳定的问题。在访问者模式中,我们有一个访问者接口和多个具体的元素类&a…...
JDK源码剖析之PriorityQueue优先级队列
写在前面 版本信息: JDK1.8 PriorityQueue介绍 在数据结构中,队列分为FIFO、LIFO 两种模型,分别为先进先出,后进后出、先进后出,后进先出(栈) 而一切数据结构都是基于数组或者是链表实现。 在…...
TSINGSEE青犀AI视频分析/边缘计算/AI算法·人脸识别功能——多场景高效运用
旭帆科技AI智能分析网关可提供海量算法供应,涵盖目标监测、分析、抓拍、动作分析、AI识别等,可应用于各行各业的视觉场景中。同时针对小众化场景可快速定制AI算法,主动适配大厂近百款芯片,打通云/边/端灵活部署,算法一…...
力扣(LeetCode)算法_C++——最大连续 1 的个数 III
给定一个二进制数组 nums 和一个整数 k,如果可以翻转最多 k 个 0 ,则返回 数组中连续 1 的最大个数 。 示例 1: 输入:nums [1,1,1,0,0,0,1,1,1,1,0], K 2 输出:6 解释:[1,1,1,0,0,1,1,1,1,1,1] 粗体数字…...
23062C++QT day2
封装一个结构体,结构体中包含一个私有数组,用来存放学生的成绩,包含一个私有变量,用来记录学生个数, 提供一个公有成员函数,void setNum(int num)用于设置学生个数 提供一个公有成员函数:void…...
React三属性之:props
作用 将父组件的参数传递给子组件 父组件 import ./App.css; import React from react; import PropsTest from ./pages/propsTest class App extends React.Component{render(){return(<div><h2>App组件</h2><PropsTest obj{{name:王惊涛,age:27}}>…...
大数据安全 | (一)介绍
目录 📚大数据安全 🐇大数据安全内涵 🐇大数据安全威胁 🐇保障大数据安全 ⭐️采集环节安全技术 ⭐️存储环节安全技术 ⭐️挖掘环节安全技术 ⭐️发布环节安全技术 🐇大数据用于安全 📚隐私及其…...
软件工程的概念及其重要性
软件工程是指将工程原理和方法应用于软件开发过程的学科,涉及软件的设计、开发、测试、维护和管理等各个阶段。它旨在提高软件开发的效率和质量,并确保软件满足用户的需求和预期。 软件工程的重要性体现在以下几个方面: 提高开发效率&#x…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
淘宝扭蛋机小程序系统开发:打造互动性强的购物平台
淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...
Leetcode33( 搜索旋转排序数组)
题目表述 整数数组 nums 按升序排列,数组中的值 互不相同 。 在传递给函数之前,nums 在预先未知的某个下标 k(0 < k < nums.length)上进行了 旋转,使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
