【测试开发】Mq消息重复如何测试?
本篇文章主要讲述重复消费的原因,以及如何去测试这个场景,最后也会告诉大家,目前互联网项目关于如何避免重复消费的解决方案。
Mq为什么会有重复消费的问题?
Mq 常见的缺点之一就是消息重复消费问题,产生这种问题的原因是什么呢?有以下几点:
工作流程
1、producer 生成数据,发送到broker集群,当遇到网络抖动超时,可能会重复发送。
为了保证数据的可靠性一般都会配置重试机制如下:
rocketmq:producer:group: sanyouProducer#发送消息超过5秒未接收到broker返回的成功消息send-message-timeout: 5000#重试最大次数retry-times-when-send-failed: 2max-message-size: 4194304name-server: 172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876#发送消息超时时长,意思是超过5秒钟未收到broker返回的发送成功的消息,#producer会重复发送,但并不是一直发送,会根据retry-times-when-send-failed次数,#最多重试多少次
极端情况下,网络出现抖动,生产者超过设置的时间未收到broker返回的成功消息,会重新发送消息。
2、消费者宕机,未提交offset给broker
由上图可知,broker接收到producer 发送的消息后,会把消息发送给消费者,一般情况下,消费者消费完一条数据,会提交一个offset给到broker,告诉它,这条消息我消费了,但是,极端情况下,消费者消费一条消息成功,提交offset之前,宕机了或者网络抖动超时了,broker未收到offset,就认为这条消息没人消费,当消费者重启服务器或网络恢复,那么broker还会发送这条消息给消费者重新消费。
3、业务上的bug,可能会导致重复消费。
生产者producer的上游系统,突然出现了bug,导致重复调用生产者所在服务的接口,生产者收到请求后,继续发送消息给broker。
当然了,重复消费的原因有很多,以上只是常见的几种原因,那怎么去测试呢?
怎么测试重复消费场景?
假如有这么一个场景,采购员在采购系统的前端页面进行采购单下单操作,下单成功后,采购系统这边会保留一份采购单数据,然后发送一条mq给到wms 仓库系统,那么生产者就是采购系统,消费者就是wms仓库系统,wms消费到采购单的消息,落入数据库wms_purchase表中,为了简化,我只设计了三个字段。
建表ddl:
CREATE TABLE `wms_purchase` (`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '仓库采购单id',`purchase_id` bigint(20) NOT NULL COMMENT '采购单id',`purchase_name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=237 DEFAULT CHARSET=utf8;
怎么测试呢?很简单,我们只要编写生产者工具,在工具里加个循环,尽量循环多次,如下:
@RestController
@RequestMapping("/mq")
public class ProducerController {// 自动注入 RocketMQTemplate模板类,用于生产消息@Autowiredprivate RocketMQTemplate mqTemplate;// 模拟生产者重复消费问题,前提是数据库没有唯一索引,并且项目未做幂等性校验@RequestMapping("/send")public String testSend(@RequestBody WmsPurchaseDto params) {try {for (int i = 0; i <100 ; i++) {mqTemplate.convertAndSend("fourbrothertopic", params);}return "success";} catch (Exception e) {e.printStackTrace();return "fail";}}
解读:
requestmapping对外暴露一个web接口,地址是localhost:8080/demo/mq/send,
post请求,参数是json格式,类似
{
"purchaseId": "256465",
"purchaseName": "测试"
}
这种形式,然后起个for循环,循环调用convertAndSend方法,发送同样的消息,最终结果如下图:
这里模拟producer重复发送的场景,前提是数据库没有对采购单id做唯一索引,并且项目未做幂等性校验。数据库里出现很多采购单id一样的数据,业务上这是不允许的。
假如说,项目出现了这么一种bug,开发那边是怎么修复的呢?
Mq如何保证幂等性?
分享几种解决方案的具体代码demo:
1、数据库unique key(表里不允许重复列出现)来保证幂等性。
很简单,我们只要在wms_purchase里,对purchaseId添加唯一索引即可,提示:在添加唯一索引之前,需清理完表里的数据。
也可以使用ddl语句:
ALTER TABLE `wms_purchase` ADD UNIQUE ( `purchaseId` )
代码不变,调用以下接口:
localhost:8080/demo/mq/send post请求
{"purchaseId": "256465","purchaseName": "测试"
}
得到以下结果:
上图中,循环生产同一条采购单数据,但是右边表中只出现了一条采购单id是256465的数据,说明添加唯一索引确实保证了幂等性,但是代码里却出现大量类似Duplicate entry '256465' for key 'uniqe_key_purchaseid' 日志,是因为触发了数据设置的唯一索引,
由于触发了唯一索引,导致消费者未提交offset给broker,那么broker会认为这条消息未被消费,后续会持续不断地推送消息给消费者,也就意味着会持续不断地报错。
另外这种持续无效的请求数据库会占用数据库的连接资源,在高并发的场景下,会严重拖垮系统响应效率。
虽然保证了幂等性,但是日志里总是报错,太不讲究、也不雅观,那怎么解决呢?
2、数据库unique key+redis 来保证幂等性。
如截图:
通俗的理解就是,消费者在进行数据库落库操作之前,会判断redis是有这条采购单数据,如果有就直接放过这条消息不做处理,没有这条数据,那就进行落库操作,但在落库之前还要进一步判断数据库是否有这条采购单数据,没有那就进行落库,落库成功,再把采购单的id当做key,采购单数据当做value set 进redis缓存里,设置一定的过期时间。
redis基于内存,操作数据特别快,在进行落库之前查询redis,可以避免很多无效的请求数据库,但是为啥要设置过期时间?因为redis的内存资源有限,并且很宝贵,所以我们希望设置的数据能在一段时间内定期失效,即使失效,也没关系,还有数据库的唯一索引兜底。
这样就很好的保证了幂等性,也避免了大量的日志报错。伪代码如下:
@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);//首先判断redis里面是否有这条采购单数据,通过PurchaseId查询,有数据,则直接放过不做处理if (redisTemplate.opsForValue().get(wmsPurchase.getPurchaseId().toString())==null){//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);}}else {//能走到这个判断分支,说明缓存里的采购单数据已经失效,如果还有消息重复消费//那就再放入缓存一次,72h过期redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);log.info("数据库已保留该数据");// 触发重复消费告警机制}}else {log.info("缓存已保留该数据");// 触发重复消费告警机制}}
}
思路很简单,如代码中注释。当然这种方法也有缺点,就是过于依赖redis,有些系统没有使用redis组件,那么还得维护一套redis组件,并且还得保证redis集群高可用。那项目只有mysql,能不能依靠数据库去维护保证幂等性呢?当然可以!
3、还有一种方法叫去重表+唯一索引,顾名思义就是另外维护一张表,记录已经消费的采购单数据,其实和上述方法差不多,上述方法查询缓存,取重表查询数据库取重表。
伪代码 如下:
@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);log.info("映射后实体消息"+ JSON.toJSONString(wmsPurchase));if (uniquePurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId().intValue()) == null){if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进unique_purchaseUniquePurchase uniquePurchase = new UniquePurchase();uniquePurchase.setPurchaseId(wmsPurchase.getPurchaseId().intValue());log.info("插入取重表消息:"+ JSON.toJSONString(uniquePurchase));uniquePurchaseMapper.insert(uniquePurchase);}}else {log.info("数据库已保留该数据");//自动触发告警机制}}else {log.info("取重表已有这条采购单数据");}}
代码已上传至gitee,感兴趣可以自行阅读。
上述方式在查询取重表时,并发不安全,极端情况下还是会触发唯一索引错误,比如说,消费者要消费大量消息(线程),执行上述代码,A线程执行完23行,挂起了,cpu把执行权给了B线程,B执行到25行并插入成功,那么这时A线程被唤起,也执行到了23行,结果触发了唯一索引错误。那怎么避免呢?
我们可以让所有线程别并发执行,串行执行,那就用到redis的分布式锁技术。
4、分布式锁+uniquekey
伪代码如下
@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedissonClient redisson;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
// 注入redisson
// 获取锁对象RLock lock = redisson.getLock("lockName");try {// 1. 最常见的使用方法//lock.lock();// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁//lock.lock(10, TimeUnit.SECONDS);// 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁boolean res = lock.tryLock();if (res) { //成功//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);}}else {redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);log.info("数据库已保留该数据");//自动触发告警机制}}} catch (Exception e) {e.printStackTrace();} finally {//释放锁RLock lockName = redisson.getLock("lockName");if (lockName.isLocked()) {if (lockName.isHeldByCurrentThread()) {lockName.unlock();}}}
}
这种也是比较常见的一种,缺点也很明显,在高并发,大请求量的场景下,所有线程串行执行,处理效率势必会降低。当然了,技术没有好坏,只有合不合适。如果你的项目并发量一般,可以尝试使用上述方法。
具体代码demo已上传至gitee平台,地址如下:
https://gitee.com/lv1792017548/rocketmq-demo.git
总结
本文主要分享了如何测试mq消息队列重复性消费,以及避免重复消费常见的解决方案。
最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!
相关文章:

【测试开发】Mq消息重复如何测试?
本篇文章主要讲述重复消费的原因,以及如何去测试这个场景,最后也会告诉大家,目前互联网项目关于如何避免重复消费的解决方案。 Mq为什么会有重复消费的问题? Mq 常见的缺点之一就是消息重复消费问题,产生这种问题的原因是什么呢…...

C++和C#程序语言的区别
一直学习C++和C#,两者之间的区别总结一下 目录 一、两种语言概述 C++语言 C#语言 二、两种语言对比 2.1运行依赖...

CentOS配置Java环境报错-bash: /usr/local/jdk1.8.0_381/bin/java: 无法执行二进制文件
CentOS配置Java环境后执行java -version时报错: -bash: /usr/local/jdk1.8.0_381/bin/java: 无法执行二进制文件原因是所使用的jdk的版本和Linux内核架构匹配不上 使用以下命令查看Linux架构: [rootlocalhost ~]# cat /proc/version Linux version 3.1…...

MySQL进阶 —— 超详细操作演示!!!(上)
MySQL进阶 —— 超详细操作演示!!!(上) 一、存储引擎1.1 MySQL 体系结构1.2 存储引擎介绍1.3 存储引擎特点1.4 存储引擎选择 二、索引2.1 索引概述2.2 索引结构2.3 索引分类2.4 索引语法2.5 SQL 性能分析2.6 索引使用2…...

一条爬虫抓取一个小网站所有数据
一条爬虫抓取一个小网站所有数据 今天闲来无事,写一个爬虫来玩玩。在网上冲浪的时候发现了一个搞笑的段子网,发现里面的内容还是比较有意思的,于是心血来潮,就想着能不能写一个Python程序,抓取几条数据下来看看&am…...

八大排序——快速排序
Hello,大家好,今天分享的八大排序里的快速排序,所谓快速排序是一个叫霍尔的人发明,有很多人可能会觉得为什么不叫霍尔排序,其中原因就是因为它快,快速则体现了它的特点,今天我们就来讲一下快速排…...

【ES】笔记-Class类剖析
Class Class介绍与初体验ES5 通过构造函数实例化对象ES6 通过Class中的constructor实列化对象 Class 静态成员实例对象与函数对象的属性不相通实例对象与函数对象原型上的属性是相通的Class中对于static 标注的对象和方法不属于实列对象,属于类。 ES5构造函数继承Cl…...

数学建模--Seaborn库绘图基础的Python实现
目录 1.绘图数据导入 2. sns.scatterplot绘制散点图 3.sns.barplot绘制条形图 4.sns.lineplot绘制线性图 5.sns.heatmap绘制热力图 6.sns.distplot绘制直方图 7.sns.pairplot绘制散图 8.sns.catplot绘制直方图 9.sns.countplot绘制直方图 10.sns.lmplot绘回归图 1.绘图数…...

lv3 嵌入式开发-2 linux软件包管理
目录 1 软件包管理 1.1流行的软件包管理机制 1.2软件包的类型 1.3软件包的命名 2 在线软件包管理 2.1APT工作原理 2.2更新软件源 2.3APT相关命令 3 离线软件包管理 1 软件包管理 1.1流行的软件包管理机制 Debian Linux首先提出“软件包”的管理机制---Deb软件包 …...

智能小区与无线网络技术
1.1 智能小区 智能小区指的是具有小区智能化系统的小区。所谓小区智能化系统,指的是在 现代计算机网络和通信技术的基础上,将传统的土木建筑技术与计算机技术、自动 控制技术、通信与信息处理技术、多媒体技术等先进技术相结合的自动化和综…...

如何传输文件流给前端
通过链接下载图片,直接http请求然后将文件流返回 注:music.ly是一个下载tiktok视频的免费接口 https://api19-core-c-useast1a.musical.ly/aweme/v1/feed/?aweme_idxxx func (m *FileBiz) DownloadFileV2(ctx *ctrl.Context, fileLink, fileName strin…...

Spring Security OAuth2 远程命令执行漏洞
文章目录 一、搭建环境二、漏洞验证三、准备payload四、执行payload五、变形payload 一、搭建环境 cd vulhub/spring/CVE-2016-4977/ docker-compose up -d 二、漏洞验证 访问 http://192.168.10.171:8080/oauth/authorize?response_type${233*233}&client_idacme&s…...

Python之并发编程介绍
一、并发编程介绍 1.1、串行、并行与并发的区别 串行(serial):一个CPU上,按顺序完成多个任务并行(parallelism):指的是任务数小于等于cpu核数,即任务真的是一起执行的并发(concurrency):一个CPU采用时间片管理方式&am…...

GO语言网络编程(并发编程)并发介绍,Goroutine
GO语言网络编程(并发编程)并发介绍,Goroutine 1、并发介绍 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。 B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更…...

英语连词总结
前言 总结一些常用的英语连词,以下用法只是我希望我自己这么用。分类我可能分的不好,慢慢积累,慢慢改进。 1)表递进: firstly、secondly、thirdly、finally、af first、at the beginning、in the end、to begin with࿰…...

LeetCode 92. Reverse Linked List II【链表,头插法】中等
本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…...

【图论】Floyd
算法提高课笔记) 文章目录 例题牛的旅行题意思路代码 排序题意思路代码 观光之旅题意思路代码 例题 牛的旅行 原题链接 农民John的农场里有很多牧区,有的路径连接一些特定的牧区。 一片所有连通的牧区称为一个牧场。 但是就目前而言,你…...

SpringCloudAlibaba Gateway(三)-整合Sentinel功能路由维度、API维度进行流控
Gateway整合Sentinel 前面使用过Sentinel组件对服务提供者、服务消费者进行流控、限流等操作。除此之外,Sentinel还支持对Gateway、Zuul等主流网关进行限流。 自sentinel1.6.0版开始,Sentinel提供了Gateway的适配模块,能针对路由(rou…...

【笔试强训选择题】Day38.习题(错题)解析
作者简介:大家好,我是未央; 博客首页:未央.303 系列专栏:笔试强训选择题 每日一句:人的一生,可以有所作为的时机只有一次,那就是现在!! 文章目录 前言一、Day…...

DAY08_MyBatisPlus——入门案例标准数据层开发CRUD-Lombok-分页功能DQL编程控制DML编程控制乐观锁快速开发-代码生成器
目录 一 MyBatisPlus简介1. 入门案例问题导入1.1 SpringBoot整合MyBatisPlus入门程序①:创建新模块,选择Spring初始化,并配置模块相关基础信息②:选择当前模块需要使用的技术集(仅保留JDBC)③:手…...

分光棱镜BS、PB、NPBS的区别
BS(分光棱镜):对入射偏振敏感,线偏振角度会影响分光比。若入射的是自然光或圆偏振光,则按50:50分光。分束的时候只管分能量,理想器件下出射的两路光偏振态还是原来的样子,实际工艺缺…...

人工智能论文通用创新点(一)——ACMIX 卷积与注意力融合、GCnet(全局特征融合)、Coordinate_attention、SPD(可替换下采样)
1.ACMIX 卷积与注意力融合 论文地址:https://arxiv.org/pdf/2111.14556.pdf 为了实现卷积与注意力的融合,我们让特征图经过两个路径,一个路径经过卷积,另外一个路径经过Transformer,但是,现在有一个问题,卷积路径比较快,Transformer比较慢。因此,我们让Q,K,V通过1*1的…...

您的计算机已被[new_day@torguard.tg].faust 勒索病毒感染?恢复您的数据的方法在这里!
导言: 随着科技的迅速发展,网络空间也变得越来越危险,而勒索病毒则是网络威胁中的一个严重问题。 [ new_daytorguard.tg ].faust 勒索病毒是最新的威胁之一,采用高度复杂的加密技术,将受害者的数据文件锁定,…...

18--Elasticsearch
一 Elasticsearch介绍 1 全文检索 Elasticsearch是一个全文检索服务器 全文检索是一种非结构化数据的搜索方式 结构化数据:指具有固定格式固定长度的数据,如数据库中的字段。 非结构化数据:指格式和长度不固定的数据,如电商网站…...

代码随想录算法训练营 day59|503.下一个更大元素II、42. 接雨水
一、503.下一个更大元素II 力扣题目链接 可以不扩充nums,在遍历的过程中模拟走两边nums class Solution { public:vector<int> nextGreaterElements(vector<int>& nums) {vector<int> result(nums.size(), -1);if (nums.size() 0) return…...

MyBatis数据库操作
文章目录 前言一、MyBatis的各种查询功能1.查询一个实体类对象2.查询一个List集合3.查询单个数据4.查询一条数据为map集合5.查询多条数据为map集合方法一方法二 6.测试类 二、特殊SQL的执行1.模糊查询2.批量删除3.动态设置表名5.添加功能获取自增的主键6.测试类 三、自定义映射…...

python flask框架 debug功能
从今天开始,准备整理一些基础知识,分享给需要的人吧 先整理个flask的debug功能,首先列举一下debug加与不加的区别,然后再上代码和图看看差异 区别: (1)加了debug后,修改js…...

《深入浅出OCR》第六章:OCR数据集与评价指标
一、OCR技术流程 在介绍OCR数据集开始,我将带领大家和回顾下OCR技术流程,典型的OCR技术pipline如下图所示,其中,文本检测和识别是OCR技术的两个重要核心技术。 1.1 图像预处理: 图像预处理是OCR流程的第一步…...

15. 线性代数 - 克拉默法则
文章目录 克拉默法则矩阵运算Hi,大家好。我是茶桁。 上节课我们在最后提到了一个概念「克拉默法则」,本节课,我们就来看看到底什么是克拉默法则。 克拉默法则 之前的课程我们一直在强调,矩阵是线性方程组抽象的来的。那么既然我们抽象出来了,有没有一种比较好的办法高效…...

【LeetCode】剑指 Offer <二刷>(6)
目录 题目:剑指 Offer 12. 矩阵中的路径 - 力扣(LeetCode) 题目的接口: 解题思路: 代码: 过啦!!! 题目:剑指 Offer 13. 机器人的运动范围 - 力扣&#…...