当前位置: 首页 > news >正文

RocketMQ(3)之事务消息

一、发送事务消息案例

        事务消息共有三种状态,提交状态、回滚状态、中间状态: 

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

        1.1创建事务性生产者

        使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。

    /*** 发送事务消息* @throws Exception*/@Testpublic void testTransactionProduce() throws Exception {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}

2.事务监听接口

        当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

 class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();/*** 本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}/*** 状态回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}

1.3事务消息使用上的限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

相关文章:

RocketMQ(3)之事务消息

一、发送事务消息案例 事务消息共有三种状态&#xff0c;提交状态、回滚状态、中间状态&#xff1a; TransactionStatus.CommitTransaction: 提交事务&#xff0c;它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务&#xff0c;它代表该消息将被删除…...

基于多设计模式下的同步异步日志系统

基于多设计模式下的同步&异步日志系统 代码链接&#xff1a;https://github.com/Janonez/Log_System 1. 项目介绍 本项目主要实现一个日志系统&#xff0c; 其主要支持以下功能&#xff1a; 支持多级别日志消息支持同步日志和异步日志支持可靠写入日志到标准输出、文件…...

API接口与电商平台之间的联系,采集京东平台数据按关键字搜索商品接口示例

关键字搜索商品的重要性&#xff1a; 1.引入精准流量 关键词第一个也是最重要的作用就是为我们宝贝引进精准的流量&#xff0c;这一作用无论是在自然搜索中还是直通车中都是一样的。 第一步关乎的是我们宝贝的展现&#xff0c;而第二步用户是否会点进我们的宝贝&#xff0c;…...

代码随想录day41|343. 整数拆分96. 不同的二叉搜索树

343. 整数拆分 class Solution:def integerBreak(self, n: int) -> int:dp [0] *(n1)dp[2]1if n <3:return dp[n]for i in range(3,n1):for j in range(1,n):dp[i]max(j*(i-j),j*dp[i-j],dp[i])return dp[n] 96. 不同的二叉搜索树 class Solution:def numTrees(self, …...

Less常用内置函数

1&#xff0c;类型函数 isnumber(value) - 判断是否为数字isstring(value) - 判断是否为字符串isurl(value) - 判断是否为urliscolor(value) - 判断是否为颜色isunit(value, unit) - 判断value值是否为指定单位 示例&#xff1a; isnumber(12); // true isnumber(#333); // f…...

pdf转换成图片转换器在线怎么转?pdf转换成图片具体方法介绍

很多用户们都是比较喜欢使用pdf文档的&#xff0c;由于这种文件格式的便携性非常高&#xff0c;所以广泛的应用于工作和学习领域&#xff0c;再加上pdf文档可以随意转换成为其他的文件格式&#xff0c;更是让pdf文档受到了更多用户们的欢迎&#xff0c;那么pdf转换成图片转换器…...

JavaScript动态设置浏览器可视区域元素的文字颜色、监听滚动条、querySelectorAll、getBoundingClientRect

文章目录 前言htmlJavaScriptquerySelectorAllgetBoundingClientRect 前言 当元素出现在浏览器可视区域时给元素设置颜色等其他操作&#xff0c;比如当元素进入浏览器可视区域时&#xff0c;设置元素进入动画。 html <div id"idBox" class"box"><…...

意向客户的信息获取到底是怎样的,快来get一下

客户信息获取技术真的可以为企业提供精准客源吗&#xff1f;这个渠道到底安不安全&#xff0c;技术到底成不成熟&#xff1f;效果到底如何&#xff1f;下面简单的和大家分析一下。 客户信息获取技术是怎样的 手机采集引流方面&#xff0c;上量不精准&#xff0c;精准不上量的说…...

自动化测试常用脚本语言有哪些?

在自动化测试中&#xff0c;常用的脚本语言包括&#xff1a; 1. Python&#xff1a;Python是一个简洁、易读且功能强大的脚本语言&#xff0c;广泛应用于自动化测试领域。它具有丰富的测试框架和库&#xff0c;可以用于Web、移动应用和API等各种类型的测试。 2. Java&#xff1…...

mapreduce 的工作原理以及 hdfs 上传文件的流程

推荐两篇博文 mapreduce 的工作原理&#xff1a; 图文详解 MapReduce 工作流程_mapreduce工作流程_Shockang的博客-CSDN博客 hdfs 上传文件的流程 HDFS原理 - 知乎...

Ubuntu22.04安装ROS2

Ubuntu22.04安装ROS2 Excerpt ROS2官方文档 ROS2清华镜像站sudo apt update sudo apt upgrade locale # check for UTF-8 sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 sudo update-locale LC_ALLe… ROS2官方文档 ROS2清华镜像站…...

uniapp - 倒计时组件-优化循环时间倒计时

使用定时器的规避方法 为了避免定时器误差导致倒计时计算错误&#xff0c;可以采用一些规避方法&#xff0c;比如将倒计时被中断时的剩余时间记录下来&#xff0c;重新开启定时器时再将这个剩余时间加到新的计算中。同时&#xff0c;为了避免定时器延迟&#xff0c;可以在每次执…...

java 实现访问者模式

访问者模式是一种行为设计模式&#xff0c;它允许您在不修改对象结构的情况下&#xff0c;向对象结构中的元素添加新的操作。这通常用于解决对象结构中元素类型多变&#xff0c;但操作类型相对稳定的问题。在访问者模式中&#xff0c;我们有一个访问者接口和多个具体的元素类&a…...

JDK源码剖析之PriorityQueue优先级队列

写在前面 版本信息&#xff1a; JDK1.8 PriorityQueue介绍 在数据结构中&#xff0c;队列分为FIFO、LIFO 两种模型&#xff0c;分别为先进先出&#xff0c;后进后出、先进后出&#xff0c;后进先出&#xff08;栈&#xff09; 而一切数据结构都是基于数组或者是链表实现。 在…...

TSINGSEE青犀AI视频分析/边缘计算/AI算法·人脸识别功能——多场景高效运用

旭帆科技AI智能分析网关可提供海量算法供应&#xff0c;涵盖目标监测、分析、抓拍、动作分析、AI识别等&#xff0c;可应用于各行各业的视觉场景中。同时针对小众化场景可快速定制AI算法&#xff0c;主动适配大厂近百款芯片&#xff0c;打通云/边/端灵活部署&#xff0c;算法一…...

力扣(LeetCode)算法_C++——最大连续 1 的个数 III

给定一个二进制数组 nums 和一个整数 k&#xff0c;如果可以翻转最多 k 个 0 &#xff0c;则返回 数组中连续 1 的最大个数 。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1,0,0,0,1,1,1,1,0], K 2 输出&#xff1a;6 解释&#xff1a;[1,1,1,0,0,1,1,1,1,1,1] 粗体数字…...

23062C++QT day2

封装一个结构体&#xff0c;结构体中包含一个私有数组&#xff0c;用来存放学生的成绩&#xff0c;包含一个私有变量&#xff0c;用来记录学生个数&#xff0c; 提供一个公有成员函数&#xff0c;void setNum(int num)用于设置学生个数 提供一个公有成员函数&#xff1a;void…...

React三属性之:props

作用 将父组件的参数传递给子组件 父组件 import ./App.css; import React from react; import PropsTest from ./pages/propsTest class App extends React.Component{render(){return(<div><h2>App组件</h2><PropsTest obj{{name:王惊涛,age:27}}>…...

大数据安全 | (一)介绍

目录 &#x1f4da;大数据安全 &#x1f407;大数据安全内涵 &#x1f407;大数据安全威胁 &#x1f407;保障大数据安全 ⭐️采集环节安全技术 ⭐️存储环节安全技术 ⭐️挖掘环节安全技术 ⭐️发布环节安全技术 &#x1f407;大数据用于安全 &#x1f4da;隐私及其…...

软件工程的概念及其重要性

软件工程是指将工程原理和方法应用于软件开发过程的学科&#xff0c;涉及软件的设计、开发、测试、维护和管理等各个阶段。它旨在提高软件开发的效率和质量&#xff0c;并确保软件满足用户的需求和预期。 软件工程的重要性体现在以下几个方面&#xff1a; 提高开发效率&#x…...

Wand-Enhancer:免费解锁WeMod Pro功能的完整解决方案

Wand-Enhancer&#xff1a;免费解锁WeMod Pro功能的完整解决方案 【免费下载链接】Wand-Enhancer Advanced UX and interoperability extension for Wand (WeMod) app 项目地址: https://gitcode.com/gh_mirrors/we/Wand-Enhancer 还在为WeMod专业版的订阅费用而犹豫吗&…...

题解:AcWing 271 杨老师的照相排列

本文分享的必刷题目是从蓝桥云课、洛谷、AcWing等知名刷题平台精心挑选而来&#xff0c;并结合各平台提供的算法标签和难度等级进行了系统分类。题目涵盖了从基础到进阶的多种算法和数据结构&#xff0c;旨在为不同阶段的编程学习者提供一条清晰、平稳的学习提升路径。 欢迎大…...

C#中实现值相等(Value Equality)的详细步骤

一、为什么“值相等”是一个需要认真对待的问题在 C# 中&#xff0c;相等并不是一个简单的问题。 很多开发者认为重写 Equals 就够了&#xff0c;但在真实系统中&#xff0c;错误或不完整的相等实现会导致&#xff1a;Dictionary / HashSet 行为异常对象“看起来相等”&#xf…...

石墨烯六边形Hubbard模型的量子模拟研究

1. 石墨烯六边形Hubbard模型的量子模拟背景在凝聚态物理研究中&#xff0c;理解强关联电子系统的行为一直是核心挑战。这类系统展现出超导、量子自旋液体等丰富物理现象&#xff0c;而Hubbard模型作为描述电子在晶格中相互作用的最简模型&#xff0c;已成为理论研究的重要工具。…...

Z变换与数字滤波器设计:从零极点分析到Python实战

1. 从理论到代码&#xff1a;Z变换如何成为数字信号处理的“瑞士军刀”如果你刚开始接触数字信号处理&#xff0c;可能会觉得Z变换是个有点抽象的数学工具。但在我十多年的音频算法和通信系统开发经历里&#xff0c;Z变换远不止是教科书上的公式——它是我们设计、分析和调试数…...

随机森林回归与PISO算法融合:实现CFD在线模型修正与状态估计

1. 项目概述&#xff1a;当随机森林“遇见”PISO算法在计算流体动力学&#xff08;CFD&#xff09;的日常工作中&#xff0c;我们常常面临一个核心矛盾&#xff1a;物理模型的普适性与特定场景的精确性难以兼得。传统的湍流模型&#xff0c;无论是雷诺平均纳维-斯托克斯&#x…...

变分量子编译:用乘积态训练实现高效量子动力学模拟

1. 项目概述与核心价值量子动力学模拟&#xff0c;简单来说&#xff0c;就是用量子计算机来“播放”一个量子系统随时间变化的“电影”。这听起来像是量子计算机的“本职工作”&#xff0c;毕竟费曼在四十多年前就提出了这个构想。然而&#xff0c;把理论构想变成在真实、不完美…...

大数据供应链预测模型监控:KS检验与Bhattacharyya系数的工程实践

1. 项目概述在供应链预测这类高价值、高风险的机器学习应用里&#xff0c;最让人提心吊胆的时刻&#xff0c;往往不是模型训练&#xff0c;而是它上线之后。我们精心调校的模型&#xff0c;就像一个被派往复杂前线的侦察兵&#xff0c;训练时用的是一套“地图”&#xff08;历史…...

C#巧用Spire.XLS for .NET隐藏或显示Excel网格线

在日常的数据处理和报表生成中&#xff0c;Excel是我们不可或缺的工具。然而&#xff0c;你是否曾遇到这样的场景&#xff1a;辛苦制作的报表&#xff0c;因为默认显示的网格线而显得不够专业&#xff0c;或是某些数据可视化图表&#xff0c;网格线反而成了干扰&#xff1f;手动…...

C#根据时间加密和防止反编译的两种方案

时间加密 用当前时间做密钥 / 校验&#xff0c;防反编译 混淆 加壳&#xff0c;配套用&#xff09;一、C# 时间加密 2 种核心实现&#xff08;直接用&#xff09;都是可直接运行的完整代码&#xff0c;适合做注册验证、临时授权方案 1&#xff1a;时间戳 AES 加密&#xff…...