当前位置: 首页 > news >正文

如何避免消息的重复消费问题?(消息消费时的幂等性)

如何避免消息的重复消费问题

  • 1、 消息的幂等性
    • 1.1、概念
    • 1.2、产生业务场景
  • 2、全局唯一ID+Redis解决消息幂等性问题
    • 2.1、application.yml配置文件
    • 2.2、生产者发送消息
    • 2.3、消费者接收消息
    • 2.4、pom.xml引入依赖
    • 2.5、RabbitConfig配置类
    • 2.6、启动类
    • 2.7、订单对象
    • 2.8、测试

1、 消息的幂等性

https://blog.csdn.net/weixin_63267801/article/details/134211065

1.1、概念

消息的幂等性:就是即使多次收到了消息,也不会重复消费。
对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响。

1.2、产生业务场景

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了。

2、全局唯一ID+Redis解决消息幂等性问题

2.1、application.yml配置文件

配置rabbitmq和redis

在这里插入图片描述

server:port: 8080
spring:application:name: rabbit_13_idempotent01_redisrabbitmq:host: 你的rabbitmq服务器IPport: 5672username: 你的rabbitmq服务管理员账号password: 你的rabbitmq服务管理员密码virtual-host: powerpublisher-confirm-type: correlated #开启交换机的确认模式publisher-returns: truelistener:simple:acknowledge-mode: manual #开启消费者的手动确认模式redis:host: 你的redis服务器IPport: 你的redis服务端口password: 你的redis服务密码database: 1  #1号数据库my:exchangeName: exchange.idempotent.01queueName: queue.idempotent.01

2.2、生产者发送消息

生产者模拟发送两笔相同的订单:
在这里插入图片描述使用com.fasterxml.jackson.databind.ObjectMapper对象进行数据序列化与反序列化:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

package com.power.service;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;@Service
@Slf4j
public class SendMessage {@Resourceprivate RabbitTemplate rabbitTemplate;//这个对象可以进行序列化和反序列化(json格式)@Resourceprivate ObjectMapper objectMapper;//构造方法执行后执行@PostConstructpublic void init(){//开启生产者的确认模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {//如果交换机接收消息成功,ack返回trueif(!ack){log.error("消息没有到达交换机,原因是:{}",cause);//TODO 重发消息或者记录错误日志}});}@Beanpublic void sendMsg() throws JsonProcessingException {{//发送第一笔订单消息Orders orders1 = Orders.builder().orderId("order_100").orderName("手机").money(new BigDecimal("2345")).orderTime(new Date()).build();//将对象转换成jsonlog.info("orders1:::::" + orders1.toString());String strOrders1 = objectMapper.writeValueAsString(orders1);log.info("strOrders1:::::" + strOrders1);MessageProperties messageProperties = new MessageProperties();//设置单条消息持久化,默认技术持久化的messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders1.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);}{//发送第二笔订单消息Orders orders2 = Orders.builder().orderId("order_100").orderName("手机").money(new BigDecimal("2345")).orderTime(new Date()).build();String strOrders2 = objectMapper.writeValueAsString(orders2);MessageProperties messageProperties = new MessageProperties();//设置单条消息持久化,默认技术持久化的messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(strOrders2.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.idempotent.01", "info", message);}log.info("消息发送完毕,发送时间是:"+new Date());}
}

2.3、消费者接收消息

在这里插入图片描述

package com.power.message;import com.fasterxml.jackson.databind.ObjectMapper;
import com.power.vo.Orders;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;@Component
@Slf4j
public class ReceiveMessage {@Resourceprivate ObjectMapper objectMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = {"queue.idempotent.01"})public void receiveMsg(Message message, Channel channel) throws IOException {//获取消息唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();//使用objectMapper把字节数组反序列化成对象Orders orders = objectMapper.readValue(message.getBody(), Orders.class);try{log.info("接收到的消息为:{}",orders.toString());//如果不存在就在redis中存储Boolean setResult = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());if(setResult){//TODO 向数据插入订单数据log.info("向数据库插入订单");}//手动确认接收消息成功channel.basicAck(deliveryTag,false);}catch (Exception e){log.error("消息处理出现问题");try {channel.basicNack(deliveryTag,false,true);} catch (IOException ex) {ex.printStackTrace();}throw new RuntimeException(e);}}
}

2.4、pom.xml引入依赖

在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.power</groupId><artifactId>rabbit_13_idempotent01_redis</artifactId><version>1.0-SNAPSHOT</version><name>rabbit_13_idempotent01_redis</name><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.5、RabbitConfig配置类

package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueName}")private String queueName;//创建交换机@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}//创建队列@Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}@Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("info");}
}

2.6、启动类

package com.power;import com.power.service.SendMessage;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class Application implements ApplicationRunner {@Resourceprivate SendMessage messageService;public static void main(String[] args) {SpringApplication.run(Application.class);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}

2.7、订单对象

package com.power.vo;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {private String orderId;private String orderName;private BigDecimal money;private Date orderTime;
}

2.8、测试

启动服务后,生产者发送消息,消费者接收消息,
两笔消息订单ID相同,但是消费者只把接收到的一条消息插入了数据库,实现了消息幂等性。
在这里插入图片描述

相关文章:

如何避免消息的重复消费问题?(消息消费时的幂等性)

如何避免消息的重复消费问题 1、 消息的幂等性1.1、概念1.2、产生业务场景 2、全局唯一IDRedis解决消息幂等性问题2.1、application.yml配置文件2.2、生产者发送消息2.3、消费者接收消息2.4、pom.xml引入依赖2.5、RabbitConfig配置类2.6、启动类2.7、订单对象2.8、测试 1、 消息…...

【Java SE】类与对象

现实世界中&#xff0c;随处可见的一个事物实体就是对象&#xff0c;而类就是同一类事物&#xff08;或对象&#xff09;的统称&#xff0c;由一个类构造对象的过程称为创建这个类的一个实例&#xff08;instance&#xff09;&#xff0c;即&#xff1a; 类&#xff08;class&…...

基于springboot的公益服务平台的设计与实现

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于springboot的公益服务平台的设计与实…...

Tomcat(6) 什么是Servlet容器?

Servlet容器是Java EE技术中的一个关键组件&#xff0c;它负责管理和执行Servlet。Servlet容器提供了运行时环境&#xff0c;使得Servlet能够接收和响应来自客户端的HTTP请求。以下是Servlet容器的详细解释&#xff0c;以及一些相关的代码示例。 Servlet容器的主要功能 加载和…...

用js去除变量里的html标签

要用 JavaScript 去除字符串中的 HTML 标签&#xff0c;你可以使用正则表达式。以下是一个简单的示例代码&#xff1a; function removeHTMLTags(str) {return str.replace(/<[^>]*>/g, ); }// 示例 var str <p>This is <b>bold</b> text with <…...

Vue3+element-plus摘要

1.如果自己电脑vue版本是vue2版本&#xff0c;下面将详细介绍如何在vue2版本基础上继续安装 vue3版本且不会影响vue2版本的使用 1-1 在c盘或者别的盘建一个文件夹vue3 1-2 在这个文件夹里使用WINR 打开终端 输入命令 npm install vue/cli 安装完即可 1-3 然后进入此文件夹中的n…...

Android Studio 将项目打包成apk文件

第一步&#xff1a;选择Build -> Generate Signed APK 会出现&#xff1a; 我们选择 Create new… 然后选择你要存放密钥的地方 点击ok之后&#xff0c;则选择好了文件&#xff0c;并生成了jks文件了。 点击ok之后&#xff0c; 会出现&#xff1a; 选择release&#xf…...

贪心算法day2(最长递增子序列)

目录 1.最长递增子序列 方法一&#xff1a;动态规划 方法二&#xff1a;贪心二分查找 1.最长递增子序列 链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 方法一&#xff1a;动态规划 思路&#xff1a;我们定义dp[i]为最长递增子序列&#xff0c;那么dp[j]就是…...

arcgis pro 学习笔记

二维三维集合在一起&#xff0c;与arcgis不同 一、首次使用&#xff0c;几个基本设置 1.选项——常规里面设置自动保存时间 2.新建工程文件&#xff0c;会自动加载地图&#xff0c;可以在选项里面设置为无&#xff0c;以提高启动效率。 3.设置缓存位置&#xff0c;可勾选每次…...

OpenGL 进阶系列06 - OpenGL变换反馈(TransformFeedback)

一:概述 变换反馈(Transform Feedback)是 OpenGL 中的一项技术,允许你将顶点着色器的输出(例如变换后的顶点数据)直接传输到缓冲区,而不是将结果渲染到屏幕上。它在图形计算中非常有用,尤其在粒子系统、模拟、几何处理等场景中,可以用来获取顶点处理的中间结果,并将其…...

elementUI 点击弹出时间 date-picker

elementUI的日期组件&#xff0c;有完整的UI样式及弹窗&#xff0c;但是我的页面不要它的UI样式&#xff0c;点击的时候却要弹出类似的日期选择器&#xff0c;那怎么办呢&#xff1f; 以下是elementUI自带的UI风格&#xff0c;一定要一个输入框来触发。 这是我的项目中要用到的…...

【浙江大学大模型系列】启真医疗大模型(国内大模型)

启真大模型是一款专注于医疗领域的AI大模型&#xff0c;它坚持“数据知识双轮驱动”的技术路线&#xff0c;通过大模型技术和医学知识库的紧密结合&#xff0c;致力于推动大模型技术在医疗行业的落地和应用实践。 启真大模型的特点在于其强大的数据整合能力和医学知识库的支持。…...

NestJS 项目中如何使用 class-validator 进行数据验证

前言 在现代Web开发中&#xff0c;数据验证是必不可少的一环&#xff0c;它不仅能够确保数据的准确性&#xff0c;还能提高系统的安全性。在使用NestJS框架进行项目开发时&#xff0c;class-validator与class-transformer这两个库为我们提供了方便的数据验证解决方案。 本文将…...

【AI抠图整合包及教程】Meta SAM2:引领图像和视频分割技术的新纪元

在人工智能的浪潮中&#xff0c;Meta公司再次以Segment Anything Model 2&#xff08;SAM 2&#xff09;引领了图像和视频分割技术的新纪元。SAM 2的发布不仅为计算机视觉领域的研究和发展注入了新的活力&#xff0c;还预示着这一技术将在多个行业中找到广泛的应用场景。这一创…...

小菜家教平台(三):基于SpringBoot+Vue打造一站式学习管理系统

目录 前言 今日进度 详细过程 相关知识点 前言 昨天重构了数据库并实现了登录功能&#xff0c;今天继续进行开发&#xff0c;创作不易&#xff0c;请多多支持~ 今日进度 添加过滤器、实现登出功能、实现用户授权功能校验 详细过程 一、添加过滤器 自定义过滤器作用&…...

ArcGIS/QGIS按掩膜提取或栅格裁剪后栅格数据的值为什么变了?

问题描述&#xff1a; 现有一栅格数据&#xff0c;使用ArcGIS或者QGIS按照矢量边界进行按掩膜提取或者栅格裁剪以后&#xff0c;其值的范围发生了变化&#xff0c;如下&#xff1a; 可以看到&#xff0c;不论是按掩膜提取还是进行栅格裁剪后&#xff0c;其值的范围均与原来栅…...

Linux的基本指令(一)

1.ls指令 功能&#xff1a;对于目录&#xff0c;该命令列出该目录下的所有子目录与文件。对于文件&#xff0c;将列出文件名以及信息。 常用选项&#xff1a; -a列出目录下的所有文件&#xff0c;包括以 . 开头的隐含文件。 -l列出文件的详细信息 举例&#xff1a; rooti…...

python导入包失败 in <module> import pandas as pd

如果安装不成功就更新一下pip python.exe -m pip install --upgrade pip 再删掉原来的pandas pip uninstall pandas 再安装一次 pip install pandas...

不惧风雨,硬核防护!雷孜LaCie小金刚三防移动硬盘颠覆认知

不惧风雨&#xff0c;硬核防护&#xff01;雷孜LaCie小金刚三防移动硬盘颠覆认知 哈喽小伙伴们好&#xff0c;我是Stark-C~ 说到移动硬盘大家潜意识的认为是一件很娇贵的数码产品&#xff0c;很怕湿&#xff0c;摔不得。所以我们在使用传统移动硬盘的时候不能摔&#xff0c;远…...

Yocto 项目下通过网络更新内核、设备树及模块

Yocto 项目下通过网络更新内核、设备树及模块 前言 在 Yocto 项目的开发过程中&#xff0c;特别是在进行 BSP&#xff08;Board Support Package&#xff09;开发时&#xff0c;经常需要调整特定软件包的版本&#xff0c;修改内核、设备树以及内核模块。然而&#xff0c;每次…...

RevokeMsgPatcher 2.1:实用高效的微信QQ防撤回完整解决方案

RevokeMsgPatcher 2.1&#xff1a;实用高效的微信QQ防撤回完整解决方案 【免费下载链接】RevokeMsgPatcher :trollface: A hex editor for WeChat/QQ/TIM - PC版微信/QQ/TIM防撤回补丁&#xff08;我已经看到了&#xff0c;撤回也没用了&#xff09; 项目地址: https://gitco…...

学习---3

有序数组的排序&#xff1a;一、暴力解法&#xff1a;思路&#xff1a;遍历数组&#xff0c;对每个数组元素进行平方&#xff0c;再用sort排序。时间复杂度&#xff1a;O(nlog n)二、双指针解法&#xff1a;思路&#xff1a;如果有序数组中有负数&#xff0c;那么这个负数平方之…...

无需配置环境!MinerU镜像一键部署,即刻体验智能文档解析

无需配置环境&#xff01;MinerU镜像一键部署&#xff0c;即刻体验智能文档解析 1. 为什么选择智能文档解析&#xff1f; 在日常办公和学习中&#xff0c;我们经常需要处理各种文档资料&#xff1a;PDF报告、扫描合同、学术论文、财务报表等。传统方式要么需要手动输入&#…...

3大实战场景解析:如何用FakeLocation实现Android应用级GPS伪装

3大实战场景解析&#xff1a;如何用FakeLocation实现Android应用级GPS伪装 【免费下载链接】FakeLocation Xposed module to mock locations per app. 项目地址: https://gitcode.com/gh_mirrors/fak/FakeLocation FakeLocation是一款基于Xposed框架的Android位置模拟工…...

Maxwell Fields Calculator双模式切换指南:堆栈与代数表达式输入实战解析

Maxwell Fields Calculator双模式切换指南&#xff1a;堆栈与代数表达式输入实战解析 在电磁仿真领域&#xff0c;Maxwell Fields Calculator一直是工程师进行后处理分析的利器。随着2025 R1版本的推出&#xff0c;一项革命性的功能——双模式表达式输入&#xff0c;彻底改变了…...

Phi-4-mini-reasoning惊艳效果:自动识别题目所属数学分支并推荐解法策略

Phi-4-mini-reasoning惊艳效果&#xff1a;自动识别题目所属数学分支并推荐解法策略 1. 模型介绍 Phi-4-mini-reasoning是微软推出的3.8B参数轻量级开源模型&#xff0c;专为数学推理、逻辑推导和多步解题等强逻辑任务设计。这个模型主打"小参数、强推理、长上下文、低延…...

手把手教你用Qt6和Arduino Uno打造实时数据监控面板(附串口数据粘包处理源码)

基于Qt6与Arduino Uno的工业级数据可视化系统开发实战 在工业物联网和智能硬件开发领域&#xff0c;实时数据监控是核心需求之一。想象一下这样的场景&#xff1a;车间里的温度传感器阵列通过Arduino采集数据&#xff0c;工程师在办公室的PC端就能实时查看温度曲线波动&#x…...

Navicat数据库自动备份实战:如何设置定时任务避免数据丢失

Navicat数据库自动备份实战&#xff1a;如何设置定时任务避免数据丢失 数据是现代企业的核心资产&#xff0c;一次意外的数据丢失可能造成难以估量的损失。作为数据库管理工具中的佼佼者&#xff0c;Navicat提供了强大的自动备份功能&#xff0c;能够帮助中小企业和个人开发者建…...

如何一站式解决漫画格式转换难题:CBconvert完整指南

如何一站式解决漫画格式转换难题&#xff1a;CBconvert完整指南 【免费下载链接】cbconvert CBconvert is a Comic Book converter 项目地址: https://gitcode.com/gh_mirrors/cb/cbconvert 还在为不同设备上的漫画格式兼容性问题而烦恼吗&#xff1f;CBconvert作为一款…...

突破网盘限制的高效工具:解锁全速下载与无缝分享的实战指南

突破网盘限制的高效工具&#xff1a;解锁全速下载与无缝分享的实战指南 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 &#xff0c;支持 百度网盘 / 阿里云盘 / 中国移动云盘 /…...