RocketMQ(3)之事务消息
一、发送事务消息案例
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
1.1创建事务性生产者
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。
/*** 发送事务消息* @throws Exception*/@Testpublic void testTransactionProduce() throws Exception {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
2.事务监听接口
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();/*** 本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}/*** 状态回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
1.3事务消息使用上的限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionalMessageCheckListener类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionTimeout参数。 - 事务性消息可能不止一次被检查或消费
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
相关文章:
RocketMQ(3)之事务消息
一、发送事务消息案例 事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除…...
基于多设计模式下的同步异步日志系统
基于多设计模式下的同步&异步日志系统 代码链接:https://github.com/Janonez/Log_System 1. 项目介绍 本项目主要实现一个日志系统, 其主要支持以下功能: 支持多级别日志消息支持同步日志和异步日志支持可靠写入日志到标准输出、文件…...
API接口与电商平台之间的联系,采集京东平台数据按关键字搜索商品接口示例
关键字搜索商品的重要性: 1.引入精准流量 关键词第一个也是最重要的作用就是为我们宝贝引进精准的流量,这一作用无论是在自然搜索中还是直通车中都是一样的。 第一步关乎的是我们宝贝的展现,而第二步用户是否会点进我们的宝贝,…...
代码随想录day41|343. 整数拆分96. 不同的二叉搜索树
343. 整数拆分 class Solution:def integerBreak(self, n: int) -> int:dp [0] *(n1)dp[2]1if n <3:return dp[n]for i in range(3,n1):for j in range(1,n):dp[i]max(j*(i-j),j*dp[i-j],dp[i])return dp[n] 96. 不同的二叉搜索树 class Solution:def numTrees(self, …...
Less常用内置函数
1,类型函数 isnumber(value) - 判断是否为数字isstring(value) - 判断是否为字符串isurl(value) - 判断是否为urliscolor(value) - 判断是否为颜色isunit(value, unit) - 判断value值是否为指定单位 示例: isnumber(12); // true isnumber(#333); // f…...
pdf转换成图片转换器在线怎么转?pdf转换成图片具体方法介绍
很多用户们都是比较喜欢使用pdf文档的,由于这种文件格式的便携性非常高,所以广泛的应用于工作和学习领域,再加上pdf文档可以随意转换成为其他的文件格式,更是让pdf文档受到了更多用户们的欢迎,那么pdf转换成图片转换器…...
JavaScript动态设置浏览器可视区域元素的文字颜色、监听滚动条、querySelectorAll、getBoundingClientRect
文章目录 前言htmlJavaScriptquerySelectorAllgetBoundingClientRect 前言 当元素出现在浏览器可视区域时给元素设置颜色等其他操作,比如当元素进入浏览器可视区域时,设置元素进入动画。 html <div id"idBox" class"box"><…...
意向客户的信息获取到底是怎样的,快来get一下
客户信息获取技术真的可以为企业提供精准客源吗?这个渠道到底安不安全,技术到底成不成熟?效果到底如何?下面简单的和大家分析一下。 客户信息获取技术是怎样的 手机采集引流方面,上量不精准,精准不上量的说…...
自动化测试常用脚本语言有哪些?
在自动化测试中,常用的脚本语言包括: 1. Python:Python是一个简洁、易读且功能强大的脚本语言,广泛应用于自动化测试领域。它具有丰富的测试框架和库,可以用于Web、移动应用和API等各种类型的测试。 2. Java࿱…...
mapreduce 的工作原理以及 hdfs 上传文件的流程
推荐两篇博文 mapreduce 的工作原理: 图文详解 MapReduce 工作流程_mapreduce工作流程_Shockang的博客-CSDN博客 hdfs 上传文件的流程 HDFS原理 - 知乎...
Ubuntu22.04安装ROS2
Ubuntu22.04安装ROS2 Excerpt ROS2官方文档 ROS2清华镜像站sudo apt update sudo apt upgrade locale # check for UTF-8 sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 sudo update-locale LC_ALLe… ROS2官方文档 ROS2清华镜像站…...
uniapp - 倒计时组件-优化循环时间倒计时
使用定时器的规避方法 为了避免定时器误差导致倒计时计算错误,可以采用一些规避方法,比如将倒计时被中断时的剩余时间记录下来,重新开启定时器时再将这个剩余时间加到新的计算中。同时,为了避免定时器延迟,可以在每次执…...
java 实现访问者模式
访问者模式是一种行为设计模式,它允许您在不修改对象结构的情况下,向对象结构中的元素添加新的操作。这通常用于解决对象结构中元素类型多变,但操作类型相对稳定的问题。在访问者模式中,我们有一个访问者接口和多个具体的元素类&a…...
JDK源码剖析之PriorityQueue优先级队列
写在前面 版本信息: JDK1.8 PriorityQueue介绍 在数据结构中,队列分为FIFO、LIFO 两种模型,分别为先进先出,后进后出、先进后出,后进先出(栈) 而一切数据结构都是基于数组或者是链表实现。 在…...
TSINGSEE青犀AI视频分析/边缘计算/AI算法·人脸识别功能——多场景高效运用
旭帆科技AI智能分析网关可提供海量算法供应,涵盖目标监测、分析、抓拍、动作分析、AI识别等,可应用于各行各业的视觉场景中。同时针对小众化场景可快速定制AI算法,主动适配大厂近百款芯片,打通云/边/端灵活部署,算法一…...
力扣(LeetCode)算法_C++——最大连续 1 的个数 III
给定一个二进制数组 nums 和一个整数 k,如果可以翻转最多 k 个 0 ,则返回 数组中连续 1 的最大个数 。 示例 1: 输入:nums [1,1,1,0,0,0,1,1,1,1,0], K 2 输出:6 解释:[1,1,1,0,0,1,1,1,1,1,1] 粗体数字…...
23062C++QT day2
封装一个结构体,结构体中包含一个私有数组,用来存放学生的成绩,包含一个私有变量,用来记录学生个数, 提供一个公有成员函数,void setNum(int num)用于设置学生个数 提供一个公有成员函数:void…...
React三属性之:props
作用 将父组件的参数传递给子组件 父组件 import ./App.css; import React from react; import PropsTest from ./pages/propsTest class App extends React.Component{render(){return(<div><h2>App组件</h2><PropsTest obj{{name:王惊涛,age:27}}>…...
大数据安全 | (一)介绍
目录 📚大数据安全 🐇大数据安全内涵 🐇大数据安全威胁 🐇保障大数据安全 ⭐️采集环节安全技术 ⭐️存储环节安全技术 ⭐️挖掘环节安全技术 ⭐️发布环节安全技术 🐇大数据用于安全 📚隐私及其…...
软件工程的概念及其重要性
软件工程是指将工程原理和方法应用于软件开发过程的学科,涉及软件的设计、开发、测试、维护和管理等各个阶段。它旨在提高软件开发的效率和质量,并确保软件满足用户的需求和预期。 软件工程的重要性体现在以下几个方面: 提高开发效率&#x…...
shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error
在前端开发中,JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作(如 Promise、async/await 等),开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝(r…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined.
这个警告表明您在使用Vue的esm-bundler构建版本时,未明确定义编译时特性标志。以下是详细解释和解决方案: 问题原因: 该标志是Vue 3.4引入的编译时特性标志,用于控制生产环境下SSR水合不匹配错误的详细报告1使用esm-bundler…...
【电路笔记】-变压器电压调节
变压器电压调节 文章目录 变压器电压调节1、概述2、变压器电压调节3、变压器电压调节示例14、变压器电压调节示例25、变压器电压调节示例36、总结变压器电压调节是变压器输出端电压因连接负载电流的变化而从其空载值向上或向下变化的比率或百分比值。 1、概述 电压调节是衡量变…...
