如何避免消息的重复消费问题?(消息消费时的幂等性)
如何避免消息的重复消费问题
- 1、 消息的幂等性
- 1.1、概念
- 1.2、产生业务场景
- 2、全局唯一ID+Redis解决消息幂等性问题
- 2.1、application.yml配置文件
- 2.2、生产者发送消息
- 2.3、消费者接收消息
- 2.4、pom.xml引入依赖
- 2.5、RabbitConfig配置类
- 2.6、启动类
- 2.7、订单对象
- 2.8、测试
1、 消息的幂等性
https://blog.csdn.net/weixin_63267801/article/details/134211065
1.1、概念
消息的幂等性:就是即使多次收到了消息,也不会重复消费。
对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响。
1.2、产生业务场景
同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了。
2、全局唯一ID+Redis解决消息幂等性问题
2.1、application.yml配置文件
配置rabbitmq和redis

server:port: 8080
spring:application:name: rabbit_13_idempotent01_redisrabbitmq:host: 你的rabbitmq服务器IPport: 5672username: 你的rabbitmq服务管理员账号password: 你的rabbitmq服务管理员密码virtual-host: powerpublisher-confirm-type: correlated #开启交换机的确认模式publisher-returns: truelistener:simple:acknowledge-mode: manual #开启消费者的手动确认模式redis:host: 你的redis服务器IPport: 你的redis服务端口password: 你的redis服务密码database: 1 #1号数据库my:exchangeName: exchange.idempotent.01queueName: queue.idempotent.01
2.2、生产者发送消息
生产者模拟发送两笔相同的订单:
使用com.fasterxml.jackson.databind.ObjectMapper对象进行数据序列化与反序列化:



package com.power.service;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;@Service
@Slf4j
public class SendMessage {@Resourceprivate RabbitTemplate rabbitTemplate;//这个对象可以进行序列化和反序列化(json格式)@Resourceprivate ObjectMapper objectMapper;//构造方法执行后执行@PostConstructpublic void init(){//开启生产者的确认模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {//如果交换机接收消息成功,ack返回trueif(!ack){log.error("消息没有到达交换机,原因是:{}",cause);//TODO 重发消息或者记录错误日志}});}@Beanpublic void sendMsg() throws JsonProcessingException {{//发送第一笔订单消息Orders orders1 = Orders.builder().orderId("order_100").orderName("手机").money(new BigDecimal("2345")).orderTime(new Date()).build();//将对象转换成jsonlog.info("orders1:::::" + orders1.toString());String strOrders1 = objectMapper.writeValueAsString(orders1);log.info("strOrders1:::::" + strOrders1);MessageProperties messageProperties = new MessageProperties();//设置单条消息持久化,默认技术持久化的messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders1.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);}{//发送第二笔订单消息Orders orders2 = Orders.builder().orderId("order_100").orderName("手机").money(new BigDecimal("2345")).orderTime(new Date()).build();String strOrders2 = objectMapper.writeValueAsString(orders2);MessageProperties messageProperties = new MessageProperties();//设置单条消息持久化,默认技术持久化的messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders2.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);}log.info("消息发送完毕,发送时间是:"+new Date());}
}
2.3、消费者接收消息

package com.power.message;import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;@Component
@Slf4j
public class ReceiveMessage {@Resourceprivate ObjectMapper objectMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = {"queue.idempotent.01"})public void receiveMsg(Message message, Channel channel) throws IOException {//获取消息唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();//使用objectMapper把字节数组反序列化成对象Orders orders = objectMapper.readValue(message.getBody(), Orders.class);try{log.info("接收到的消息为:{}",orders.toString());//如果不存在就在redis中存储Boolean setResult = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());if(setResult){//TODO 向数据插入订单数据log.info("向数据库插入订单");}//手动确认接收消息成功channel.basicAck(deliveryTag,false);}catch (Exception e){log.error("消息处理出现问题");try {channel.basicNack(deliveryTag,false,true);} catch (IOException ex) {ex.printStackTrace();}throw new RuntimeException(e);}}
}
2.4、pom.xml引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.power</groupId><artifactId>rabbit_13_idempotent01_redis</artifactId><version>1.0-SNAPSHOT</version><name>rabbit_13_idempotent01_redis</name><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2.5、RabbitConfig配置类
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueName}")private String queueName;//创建交换机@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}//创建队列@Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}@Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("info");}
}
2.6、启动类
package com.power;import com.power.service.SendMessage;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class Application implements ApplicationRunner {@Resourceprivate SendMessage messageService;public static void main(String[] args) {SpringApplication.run(Application.class);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}
2.7、订单对象
package com.power.vo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {private String orderId;private String orderName;private BigDecimal money;private Date orderTime;
}
2.8、测试
启动服务后,生产者发送消息,消费者接收消息,
两笔消息订单ID相同,但是消费者只把接收到的一条消息插入了数据库,实现了消息幂等性。

相关文章:
如何避免消息的重复消费问题?(消息消费时的幂等性)
如何避免消息的重复消费问题 1、 消息的幂等性1.1、概念1.2、产生业务场景 2、全局唯一IDRedis解决消息幂等性问题2.1、application.yml配置文件2.2、生产者发送消息2.3、消费者接收消息2.4、pom.xml引入依赖2.5、RabbitConfig配置类2.6、启动类2.7、订单对象2.8、测试 1、 消息…...
【Java SE】类与对象
现实世界中,随处可见的一个事物实体就是对象,而类就是同一类事物(或对象)的统称,由一个类构造对象的过程称为创建这个类的一个实例(instance),即: 类(class&…...
基于springboot的公益服务平台的设计与实现
文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于springboot的公益服务平台的设计与实…...
Tomcat(6) 什么是Servlet容器?
Servlet容器是Java EE技术中的一个关键组件,它负责管理和执行Servlet。Servlet容器提供了运行时环境,使得Servlet能够接收和响应来自客户端的HTTP请求。以下是Servlet容器的详细解释,以及一些相关的代码示例。 Servlet容器的主要功能 加载和…...
用js去除变量里的html标签
要用 JavaScript 去除字符串中的 HTML 标签,你可以使用正则表达式。以下是一个简单的示例代码: function removeHTMLTags(str) {return str.replace(/<[^>]*>/g, ); }// 示例 var str <p>This is <b>bold</b> text with <…...
Vue3+element-plus摘要
1.如果自己电脑vue版本是vue2版本,下面将详细介绍如何在vue2版本基础上继续安装 vue3版本且不会影响vue2版本的使用 1-1 在c盘或者别的盘建一个文件夹vue3 1-2 在这个文件夹里使用WINR 打开终端 输入命令 npm install vue/cli 安装完即可 1-3 然后进入此文件夹中的n…...
Android Studio 将项目打包成apk文件
第一步:选择Build -> Generate Signed APK 会出现: 我们选择 Create new… 然后选择你要存放密钥的地方 点击ok之后,则选择好了文件,并生成了jks文件了。 点击ok之后, 会出现: 选择release…...
贪心算法day2(最长递增子序列)
目录 1.最长递增子序列 方法一:动态规划 方法二:贪心二分查找 1.最长递增子序列 链接:. - 力扣(LeetCode) 方法一:动态规划 思路:我们定义dp[i]为最长递增子序列,那么dp[j]就是…...
arcgis pro 学习笔记
二维三维集合在一起,与arcgis不同 一、首次使用,几个基本设置 1.选项——常规里面设置自动保存时间 2.新建工程文件,会自动加载地图,可以在选项里面设置为无,以提高启动效率。 3.设置缓存位置,可勾选每次…...
OpenGL 进阶系列06 - OpenGL变换反馈(TransformFeedback)
一:概述 变换反馈(Transform Feedback)是 OpenGL 中的一项技术,允许你将顶点着色器的输出(例如变换后的顶点数据)直接传输到缓冲区,而不是将结果渲染到屏幕上。它在图形计算中非常有用,尤其在粒子系统、模拟、几何处理等场景中,可以用来获取顶点处理的中间结果,并将其…...
elementUI 点击弹出时间 date-picker
elementUI的日期组件,有完整的UI样式及弹窗,但是我的页面不要它的UI样式,点击的时候却要弹出类似的日期选择器,那怎么办呢? 以下是elementUI自带的UI风格,一定要一个输入框来触发。 这是我的项目中要用到的…...
【浙江大学大模型系列】启真医疗大模型(国内大模型)
启真大模型是一款专注于医疗领域的AI大模型,它坚持“数据知识双轮驱动”的技术路线,通过大模型技术和医学知识库的紧密结合,致力于推动大模型技术在医疗行业的落地和应用实践。 启真大模型的特点在于其强大的数据整合能力和医学知识库的支持。…...
NestJS 项目中如何使用 class-validator 进行数据验证
前言 在现代Web开发中,数据验证是必不可少的一环,它不仅能够确保数据的准确性,还能提高系统的安全性。在使用NestJS框架进行项目开发时,class-validator与class-transformer这两个库为我们提供了方便的数据验证解决方案。 本文将…...
【AI抠图整合包及教程】Meta SAM2:引领图像和视频分割技术的新纪元
在人工智能的浪潮中,Meta公司再次以Segment Anything Model 2(SAM 2)引领了图像和视频分割技术的新纪元。SAM 2的发布不仅为计算机视觉领域的研究和发展注入了新的活力,还预示着这一技术将在多个行业中找到广泛的应用场景。这一创…...
小菜家教平台(三):基于SpringBoot+Vue打造一站式学习管理系统
目录 前言 今日进度 详细过程 相关知识点 前言 昨天重构了数据库并实现了登录功能,今天继续进行开发,创作不易,请多多支持~ 今日进度 添加过滤器、实现登出功能、实现用户授权功能校验 详细过程 一、添加过滤器 自定义过滤器作用&…...
ArcGIS/QGIS按掩膜提取或栅格裁剪后栅格数据的值为什么变了?
问题描述: 现有一栅格数据,使用ArcGIS或者QGIS按照矢量边界进行按掩膜提取或者栅格裁剪以后,其值的范围发生了变化,如下: 可以看到,不论是按掩膜提取还是进行栅格裁剪后,其值的范围均与原来栅…...
Linux的基本指令(一)
1.ls指令 功能:对于目录,该命令列出该目录下的所有子目录与文件。对于文件,将列出文件名以及信息。 常用选项: -a列出目录下的所有文件,包括以 . 开头的隐含文件。 -l列出文件的详细信息 举例: rooti…...
python导入包失败 in <module> import pandas as pd
如果安装不成功就更新一下pip python.exe -m pip install --upgrade pip 再删掉原来的pandas pip uninstall pandas 再安装一次 pip install pandas...
不惧风雨,硬核防护!雷孜LaCie小金刚三防移动硬盘颠覆认知
不惧风雨,硬核防护!雷孜LaCie小金刚三防移动硬盘颠覆认知 哈喽小伙伴们好,我是Stark-C~ 说到移动硬盘大家潜意识的认为是一件很娇贵的数码产品,很怕湿,摔不得。所以我们在使用传统移动硬盘的时候不能摔,远…...
Yocto 项目下通过网络更新内核、设备树及模块
Yocto 项目下通过网络更新内核、设备树及模块 前言 在 Yocto 项目的开发过程中,特别是在进行 BSP(Board Support Package)开发时,经常需要调整特定软件包的版本,修改内核、设备树以及内核模块。然而,每次…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...
基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
