当前位置: 首页 > news >正文

使用消息队列怎样防止消息重复?

大家好,我是君哥。

使用消息队列时,我们经常会遇到一个可能对业务产生影响的问题,消息重复。在订单、扣款、对账等对幂等有要求的场景,消息重复的问题必须解决。

那怎样应对重复消息呢?今天来聊一聊这个话题。

1.三个语义

正确使用消息队列,我们会考虑到消息防丢失、防重复,我们介绍 3 个语义:

  • At Least Once:在消息队列中,指消息不丢失,一条消息最少被消费一次,但是可能会有重复消费。

  • Exactly Once:在消息队列中,消息被精准消费一次,不丢失,也不会重复;

  • At Most Once:在消息队列中,消息不会被重复消费,但是可能会有消息丢失

不同的消息场景,需要的语义不同。比如 Exactly Once 最难实现,一般需要引入事务消息。

不同使用场景,对语义的要求也不一样。比如日志收集类的场景,At Most Once 就可以满足,而支付类的场景则要求 Exactly Once。

2.消息重复

什么情况下会导致消息重复呢?

生产者发送消息后,Broker 保存成功,但是没有成功给生产者返回 ACK,生产者以为消息发送失败,重试,再次给 Broker 发送。Broker 保存了重复消息,导致 Consumer 多次消费。

图片

消费者消费消息后,给 Broker 返回 ACK 失败,导致 Broker 没有修改偏移量,同一条消息再次发送给消费者,或者被消费者拉取到。

图片

3.生产者防重

有的消息中间件是支持生产者幂等的。比如 Kafka 从 0.11.0 版本开始引入了幂等 Producer,可以使用下面代码开启幂等 Producer:

Properties props = new Properties();
//省略其他代码
//配置幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka 实现生产者幂等的原理是在生产者引入了 Producer ID(PID)和 Sequence Number 这两个参数。

  • PID:Producer 拥有的 ID,唯一标识一个 Producer。

  • Sequence Number:自增的数值,唯一标识同一个 Producer 发送到指定分区的消息 ID。

有了这两个参数,Broker 单分区就可以唯一标识一个生产者发送的唯一一条消息<PID,SequenceNumber>。Broker 收到消息时,如果检查到消息的<PID,SequenceNumber>已经存在,就不会再保留这条消息。

但幂等 Producer 只能在单分区下生效,多分区情况下是不生效的。因为多个分区之间并不能相互访问对方的<PID,SequenceNumber>。

图片

4.Broker 防重

Broker 如果可以防重,那对于生产者和消费者来说,节省了大量的工作。下面我们看下 Pulsar 是怎样防重的。

Broker 通过参数 BrokerDeduplicationEnabled 开启防重功能。对于 Producer 发送的重复消息,Broker 返回响应 -1:-1。

Producer 发送消息时,会带一个 sequenceId 字段,Broker 会按照 ProducerName 维度记录当前生产者最大的 sequenceId(highestSequenceId)。Broker 收到消息时,首先会判断消息中的 sequenceId 是否大于自己保存的当前生产者的 highestSequenceId,如果是则保存消息并更新 highestSequenceId,否则丢弃消息,并且给 Producer 返回 -1:-1。

下面是三个极端情况:

  1. Producer 断开连接:这种情况下,跟 Broker 重新建立连接后,本地保存的 sequenceId 还在,只要使用 sequenceId 递增后发送消息即可;

  2. Producer 宕机:Producer 重启后,缓存的 sequenceId 肯定不存在了,这时跟 Broker 重新建立连接后,Broker 会根据 ProducerName 找出 highestSequenceId 发给 Producer,Producer 使用这个 sequenceId 来发送消息;

  3. Producer 和 Broker 都宕机:Broker 重启后,可以从宕机前保存的快照中恢复各 Producer 对应的 highestSequenceId 发送给各 Producer。但这个 highestSequenceId 不一定准确,因为 Broker 宕机瞬间很有可能最新的 sequenceId 没有来得及保存快照。

需要注意的是,跟 Kafka 的幂等 Producer 类似,Pulsar 的 Broker 幂等也只能保证 Topic/Partition 级别。

5.消费者防重

从上面的分析可以看出,靠生产者防重和 Broker 防重,只能在 Topic/Partition 级别生效,这通常并不能满足我们的需求。而为了避免消费者重复消费对业务造成影响,消息防重还是必要的。这就要求我们做最后一道防线,在消费端进行防重或幂等处理。

消费端做防重,就不再考虑消息中间件层面的配置(比如 sequenceId),而是从消息体进行下手。

生产者发送消息时,给消息体赋值一个全局唯一的 ID,消费者处理消息时,根据全局唯一 ID 做防重。

比如消费端的逻辑是保存一条订单消息,那把唯一 ID 保存到数据库并且加一个唯一索引,这样根据唯一索引就可以做消息去重。

不过使用唯一索引也有缺点:

  • 如果使用 MySQL 数据库,不能使用 Change Buffer;

  • 非插入的场景(比如更新库存)不能去重。

对于唯一索引的缺点,我们可以引入 Redis 对唯一 ID 做保存,利用 setNx 判断消息是否已经处理过。如下图:

图片

if (jedis.setnx(ID, "1") == 1) {//处理业务,返回 ACK
}else {//直接返回 返回 ACK
}

6.总结

使用消息队列,在一些场景下是需要防重的。主流消息队列提供了一些防重的能力,但并不是完全可靠的。在对重复消息敏感的场景下,最好是在消费端处理消息时,从业务层面进行消息防重。

相关文章:

使用消息队列怎样防止消息重复?

大家好&#xff0c;我是君哥。 使用消息队列时&#xff0c;我们经常会遇到一个可能对业务产生影响的问题&#xff0c;消息重复。在订单、扣款、对账等对幂等有要求的场景&#xff0c;消息重复的问题必须解决。 那怎样应对重复消息呢&#xff1f;今天来聊一聊这个话题。 1.三…...

MySQL安装多版本与版本切换

起因 今天在将一个项目部署到本地&#xff0c;想着是先找到一个功能差不多的开源项目&#xff0c;再在这基础之上进行改动&#xff0c;找到的这个项目使用的MySQL版本是MySQL5.7&#xff0c;应该是比较古早的项目了&#xff0c;但是我现在装的是8.4版本的&#xff0c;所以涉及…...

Docker02 - 深入理解Docker

深入理解Docker 文章目录 深入理解Docker一&#xff1a;Docker镜像原理1&#xff1a;镜像加载原理1.1&#xff1a;unionFS1.2&#xff1a;加载原理 2&#xff1a;分层理解 二&#xff1a;容器数据卷详解1&#xff1a;什么是容器数据卷2&#xff1a;使用数据卷3&#xff1a;具名…...

检查SSH安全配置-sshd服务端未认证连接最大并发量配置

介绍 MaxStartups参数指到SSH守护进程的未经身份验证的最大并发连接数。 逻辑依据 为防止系统因大量待处理的身份验证连接尝试而出现拒绝服务的情况&#xff0c;请使用 MaxStartups 的速率限制功能来保护 sshd 登录的可用性&#xff0c;并防止守护进程不堪重负。 检查方法 …...

HarmonyOS Design 介绍

HarmonyOS Design 介绍 文章目录 HarmonyOS Design 介绍一、HarmonyOS Design 是什么&#xff1f;1. 设计系统&#xff08;Design System&#xff09;2. UI 框架的支持3. 设计工具和资源4. 开发指南5. 与其他设计系统的对比总结 二、HarmonyOS Design 特点 | 应用场景1. Harmon…...

C++中的多重继承

在 C 中&#xff0c;多重继承是一种允许一个类同时继承多个基类的特性。这意味着派生类可以继承多个基类的属 性和方法。 多重继承增加了语言的灵活性&#xff0c;但同时也引入了额外的复杂性&#xff0c;特别是当多个基类具有相同 的成员时。 基本概念 在多重继承中&#xff…...

Java基础第14天-坦克大战【1】

Java绘图坐标体系 像素 计算机在屏幕上显示的内容都是由屏幕上的每一个像素组成的。如&#xff0c;计算机显示器的分辨率是800x600&#xff0c;表示计算机屏幕上的每一行由800个点组成&#xff0c;共有600行&#xff0c;整个计算机屏幕共有480000个像素。像素是一个密度单位。…...

Java线程池入门04

1. 提交任务的两种方式 executorsubmit 2. executor executor位于Executor接口中 public interface Executor {void executor(Runnable command); }executor提交的是无返回值的任务 下面是一个具体的例子 package LearnThreadPool; import java.util.concurrent.ExecutorSe…...

【论文笔记-ECCV 2024】AnyControl:使用文本到图像生成的多功能控件创建您的艺术作品

AnyControl&#xff1a;使用文本到图像生成的多功能控件创建您的艺术作品 图1 AnyControl的多控制图像合成。该研究的模型支持多个控制信号的自由组合&#xff0c;并生成与每个输入对齐的和谐结果。输入到模型中的输入控制信号以组合图像显示&#xff0c;以实现更好的可视化。 …...

计算机毕业设计 ——jspssm519Springboot 的幼儿园管理系统

作者&#xff1a;程序媛9688 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等。 &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff08;免费咨询指导选题&#xff09;&#xf…...

山东大学软件学院人工智能导论实验之知识库推理

目录 实验目的&#xff1a; 实验代码&#xff1a; 实验内容&#xff1a; 实验结果 实验目的&#xff1a; 输入相应的条件&#xff0c;根据知识库推理得出相应的知识。 实验代码&#xff1a; def find_data(input_process_data_list):for epoch, data_process in enumerat…...

【Uniapp-Vue3】点击将内容复制到剪切板

具体使用方法在官网&#xff1a; uni-app官网https://uniapp.dcloud.net.cn/api/system/clipboard.html大致使用方法如下&#xff1a; // value是需要复制的值 function copyValue (value) { uni.setClipboardData({data: value,success: res>{// 复制成功逻辑},fail:err&…...

英伟达 Isaac Sim仿真平台体验【2】

一、产品基础信息 仿真平台&#xff1a;NVIDIA Isaac Sim 4.1.0硬件配置&#xff1a;NVIDIA RTX 4090 2 (24GB显存)核心特性&#xff1a; Omniverse内核的多GPU物理加速原生PyTorch/TensorFlow集成支持基于USD的场景构建体系 二、GPU加速仿真实战 ▶ 多球体跌落测试 操作步…...

低代码与开发框架的一些整合[3]

1.基本说明 审批流程是企业内部运营的运行流程&#xff0c;与业务板块进行关联&#xff0c;在企业数智化过程中启动业务串联的作用&#xff0c;与AI业务模型及业务agent整合后&#xff0c;将大大提升企业的运行效率以及降低运营风险。 近期对开源的近40个携带流程平台的项目进…...

deepseek-r1-centos-本地服务器配置方法

参考&#xff1a; 纯小白 Centos 部署DeepSeek指南_centos部署deepseek-CSDN博客 https://blog.csdn.net/xingxin550/article/details/145574080 手把手教大家如何在Centos7系统中安装Deepseek&#xff0c;一文搞定_centos部署deepseek-CSDN博客 https://blog.csdn.net/soso67…...

C语言实现通讯录项目

一、通讯录功能 实现一个可以存放100个人的信息的通讯录&#xff08;这里采用静态版本&#xff09;&#xff0c;每个人的信息有姓名、性别、年龄、电话、地址等。 通讯录可以执行的操作有添加联系人信息、删除指定联系人、查找指定联系人信息、修改指定联系人信息、显示联系人信…...

Effective Java读书笔记 draft

一、创建和销毁对象 1、静态工厂方法代替构造器 class Person{//构造器public Person(){}//静态工厂方法public static Person getInstance(){return new Person();} } 优势&#xff1a;1、有名字&#xff0c;代码更容易阅读理解&#xff1b;2、不用每次被调用时都创建新对…...

DeepSeek 助力 Vue 开发:打造丝滑的滑块(Slider)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…...

wordpress使用CorePress主题设置项总结

宝塔面板设置 软件商店中安装的软件有&#xff1a;&#xff08;宝塔网站加速3.1&#xff09;&#xff08;Nginx 1.18.0&#xff09;&#xff08;MySql 5.6.50&#xff09;&#xff08;PHP-5.6&#xff09;&#xff08;phpMyAdmin 4.4&#xff09;&#xff08;Python项目管理器 …...

逆向pyinstaller打包的exe软件,获取python源码(6)

在ailx10&#xff1a;逆向pyinstaller打包的exe软件&#xff0c;获取python源码(3)中&#xff0c;我们逆向出了主程序&#xff0c;但是对其依赖的其他python文件并没有给出逆向方法&#xff0c;实际上非常简单&#xff0c;在PYZ-00.pyz_extracted 文件夹中&#xff0c;只要逆向…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解

进来是需要留言的&#xff0c;先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码&#xff0c;输入的<>当成字符串处理回显到页面中&#xff0c;看来只是把用户输…...

DiscuzX3.5发帖json api

参考文章&#xff1a;PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下&#xff0c;适配我自己的需求 有一个站点存在多个采集站&#xff0c;我想通过主站拿标题&#xff0c;采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...