如何避免重复消费消息
博主介绍:✌全网粉丝3W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌
博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦
🍅开源项目免费哦(有vue2与vue3版本):点击这里克隆或者下载 🍅
🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟
Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》
https://blog.csdn.net/qq_57756904/category_12199600.html
目录
常见方法
RocketMQ
RabbitMQ
Kafka
常见方法
消息中间件通常采用一些策略来避免消息的重复消费。这在分布式系统中非常重要,以确保消息被处理一次且仅一次,避免产生错误或重复的结果。以下是一些常见的方法:
消息确认机制:消费者在处理完一条消息后,向消息中间件发送确认消息。如果消息中间件收到确认,就会将该消息标记为已消费,如果没有收到确认,就会将消息重新发送给其他消费者。这确保了消息只有在确认后才会被标记为已处理。
消息去重:消息中间件可以在存储消息之前对消息内容进行去重操作,以确保相同内容的消息只被投递一次。
消费者端去重:消费者可以在自己的业务逻辑中实现去重操作。比如,记录已处理的消息 ID 或唯一标识符,以避免处理相同的消息。
幂等性处理:消费者可以设计其处理逻辑,使得多次处理相同的消息不会产生不一致的结果。这需要确保相同的操作可以多次执行,而不会引起问题。例如,数据库插入操作可以使用主键冲突处理,确保不会重复插入相同记录。
消息超时机制:如果消息在一定时间内没有得到确认,消息中间件可以将其重新发送给其他消费者,以确保消息不会永远挂起在未确认状态。
分布式事务:在一些情况下,消息消费可能需要和其他操作一起构成一个分布式事务。消息中间件可以与其他数据存储或操作协同工作,以保证消息和其他操作的一致性。
消息顺序保证:有些消息中间件支持保证消息按照特定的顺序传递给消费者,这有助于避免由于消息乱序而导致的重复消费问题。
不同的消息中间件提供不同的机制来处理消息的重复消费问题,开发者在选择和使用消息中间件时需要考虑这些因素,并根据实际需求来实现避免重复消费的策略。
RocketMQ
Apache RocketMQ 是一个开源的分布式消息中间件,它提供了一些机制来避免重复消费消息。以下是 RocketMQ 如何避免重复消费消息的一些方法:
消息消费状态追踪:RocketMQ 提供了消费者的消息消费状态追踪功能。消费者可以定期向消息中间件发送消费进度信息,包括已成功消费的消息的偏移量。当消费者重启或者发生故障时,RocketMQ 可以根据消费者提交的消费进度信息,将尚未消费的消息重新传递给消费者。
消费者组:RocketMQ 允许多个消费者以相同的消费者组名字订阅同一个主题。这些消费者会形成一个消费者组,消息会被分发给组内的每个消费者。当组内某个消费者成功消费了一条消息后,消息将被标记为已消费,其他组内的消费者将不会再收到该消息。这样可以确保在同一个消费者组内不会出现重复消费。
消费者幂等性设计:开发者可以设计消费者的业务逻辑,使得即使接收到相同的消息多次,也不会产生重复的影响。这需要在业务逻辑中考虑幂等性,确保多次处理相同消息不会产生错误或重复的结果。
消费者端去重:类似于其他消息中间件,RocketMQ 的消费者也可以在消费者端实现去重操作,比如记录已处理的消息 ID 或唯一标识符,以避免处理相同的消息。
消息的唯一标识符:为每条消息生成一个唯一的标识符,并在消费者端使用这个标识符来判断是否重复消费。这要求生产者在发送消息时,附加一个唯一标识符。
需要注意的是,尽管 RocketMQ 提供了这些机制来避免重复消费,但开发者在设计和实现消费者时,仍然需要注意保证幂等性和正确处理可能的重复消息情况。
RabbitMQ
RabbitMQ 是另一个常用的开源消息中间件,它也提供了一些方法来避免重复消费消息。以下是 RabbitMQ 如何处理避免重复消费消息的一些方式:
消息确认机制:RabbitMQ 支持消息确认机制,消费者在处理完一条消息后,向 RabbitMQ 发送确认消息。如果消息处理成功,RabbitMQ 将会将该消息标记为已消费,如果没有收到确认,RabbitMQ 可能会将消息重新发送给其他消费者。
消息去重:在消息的发布者端,可以设置消息的唯一标识符,并在消费者端维护已处理的消息标识符。这样消费者在处理消息前,先检查该消息的标识符是否已经处理过,避免重复消费。
消费者幂等性设计:类似于其他消息中间件,RabbitMQ 的消费者也可以设计业务逻辑,使得多次处理相同的消息不会引起错误或重复的结果。
消息的唯一标识符:为每条消息生成一个唯一的标识符,消费者在处理消息时,可以使用这个标识符来判断是否已经处理过该消息。
消费者端的定时确认:消费者可以在处理完消息后,通过一段时间内定时确认的方式,来确保消息已经被正确处理。如果在确认之前消费者发生了故障,消息会被重新发送给其他消费者。
消息过期机制:RabbitMQ 支持设置消息的过期时间,如果一条消息在一定时间内没有被消费者处理,就会被标记为过期,不会再被发送给消费者。
无论选择哪种方法,都需要开发者在设计消费者时考虑到可能的重复消息问题,并实现相应的逻辑来确保消息被处理一次且仅一次。
Kafka
Apache Kafka 是另一种流行的分布式消息中间件,它也提供了一些方法来避免重复消费消息。以下是 Kafka 如何处理避免重复消费消息的一些策略:
消费者偏移量(Consumer Offset)管理:Kafka 使用偏移量来标识每个消费者所消费的消息位置。消费者可以将已处理的消息的偏移量保存在外部存储中(如数据库或文件),以便在重启后能够从正确的位置开始消费。这确保了消费者能够继续从上次处理的位置继续消费消息,避免了重复消费。
消费者组管理:Kafka 允许多个消费者以相同的消费者组名字订阅同一个主题。同一个消费者组内的消费者共同消费消息,并且每条消息只会被组内的一个消费者处理。这样可以避免同一消息被多次消费。
幂等性处理:消费者可以设计其处理逻辑,使得多次处理相同的消息不会产生不一致的结果。这需要确保相同的操作可以多次执行,而不会引起问题。例如,数据库插入操作可以使用主键冲突处理,确保不会重复插入相同记录。
消息超时机制:Kafka 提供了消息超时的机制,如果一个消费者在一定时间内没有确认消费消息,Kafka 将会将该消息重新发送给其他消费者。
消费者心跳和会话超时:消费者定期发送心跳给 Kafka 服务器,以表明自己还在活动状态。如果消费者崩溃或无法发送心跳,Kafka 服务器会认为该消费者不再活动,并将其所持有的分区重新分配给其他消费者。这有助于避免消费者长时间无响应而导致重复消费。
幂等生产者:在消息的生产端,可以使用幂等生产者来确保消息不会重复发送。Kafka 的幂等生产者会在发送消息时为消息分配一个唯一的序列号,并在发送失败后自动重试,确保消息只会被发送一次。
使用这些方法,开发者可以在 Kafka 中实现避免重复消费消息的策略,确保消息被处理一次且仅一次。
相关文章:

如何避免重复消费消息
博主介绍:✌全网粉丝3W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验…...

【若依框架RuoYi-Vue-Plus 图片回显不显示问题,OSS文件上传或者本地上传】
一、问题 1.设计表 product(商品表) 有 id (id) name(商品名)icon(图标) 2.使用若依代码生成功能,导入product表,代码生成。 3.将生成的代码导入到项目中得到…...

docker搭建rocketmq环境
准备局域网 nameserver和broker在同一网段才能够互相访问,我们先创建一个局域网。 创建rocketmq-network,让nameserver、broker在同一个网段: docker network create --driverbridge \ --subnet192.168.2.10/24 rocketmq-network安装names…...

uwsgi部署多进程django apscheduler与问题排查
💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。📝 CSDN主页:Zeeland🔥📣 我的博客:Zeeland📚 Github主页: Undertone0809 (Zeeland)&…...

git difftool对比差异,避免推送不相关内容
问题 在利用git进行版本管理的时候,经常会由于对其他不相关的代码,做了一些小改动,例如删除了一个空行,多了一个缩进等。 为避免将这些不相关的改动也提交到远程,对PR造成不必要的影响,可以利用git diff命…...

Java设计模式:一、六大设计原则-05:接口隔离原则
文章目录 一、定义:接口隔离原则二、模拟场景:接口隔离原则三、违背方案:接口隔离原则3.1 工程结构3.2 英雄技能调用3.2.1 英雄技能接口3.2.2 英雄:后裔3.2.3 英雄:廉颇 3.3 单元测试 四、改善代码:接口隔离…...

第63步 深度学习图像识别:多分类建模误判病例分析(Tensorflow)
基于WIN10的64位系统演示 一、写在前面 上两期我们基于TensorFlow和Pytorch环境做了图像识别的多分类任务建模。这一期我们做误判病例分析,分两节介绍,分别基于TensorFlow和Pytorch环境的建模和分析。 本期以健康组、肺结核组、COVID-19组、细菌性&am…...

OpenCv读/写视频色差 方案
OpenCv read / write video color differenceOpenCv读/写视频色差 感谢博主: OpenCv读/写视频色差答案 - 爱码网 有没有办法让 OpenCV 使用正确的转换?? 是的,使用 GStreamer 后端而不是 FFmpeg 后端,颜色看起来很完…...

【传输层】网络基础 -- UDP协议 | TCP协议
再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…...

Android开发之性能测试工具Profiler
前言 性能优化问题,在我们开发时都会遇到,但是在小厂和对自己要求不严格的情况下,我都很少去做性能优化; 在性能优化上,基本大家都是通过自己的开发经验和性能分析工具来发现问题,今天给大家分享一下小编最…...

SpringBoot初级开发--多环境配置的集成(9)
在Springboot的开发中,我们经常要切换各种各样的环境配置,比如现在是开发环境,然后又切换到生产环境,这个时候用多环境配置就是一个明智的选择。接下来我们沿用上一章的工程来配置多环境配置工程。 1.准备多环境配置文件 这里我…...

(数学) 剑指 Offer 39. 数组中出现次数超过一半的数字 ——【Leetcode每日一题】
❓ 剑指 Offer 39. 数组中出现次数超过一半的数字 难度:简单 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输…...

如何用PS把roughness贴图转换成Smoothness,并放入Metallic贴图的a通道。
1:用PS打开Roughness贴图 2:选择反相,装换成Smoothness贴图 3:新建一个大小相等的psd文件,或者打开Metallic贴图 4:如果没有金属度贴图,就把新建的图画成纯黑色 5:选择图层蒙版->…...

了解XSS攻击与CSRF攻击
什么是XSS攻击 XSS(Cross-Site Scripting,跨站脚本攻击)是一种常见的网络安全漏洞,它允许攻击者在受害者的浏览器上执行恶意脚本。这种攻击通常发生在 web 应用程序中,攻击者通过注入恶意脚本来利用用户对网站的信任&…...

安全测试-django防御安全策略
django安全性 django针对安全方面有一些处理,学习如何进行处理设置,也有利于学习安全测试知识。 CSRF 跨站点请求伪造(Cross-Site Request Forgery,CSRF)是一种网络攻击方式,攻击者欺骗用户在自己访问的网…...

7.react useReducer使用与常见问题
useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…...

c#泛型(generic)
概述: C#中的泛型(Generics)是一种允许在编写类、方法和委托时使用参数化类型的机制。泛型允许我们编写更通用、可重用的代码,可以避免类型转换和重复编写类似的代码。 泛型的基本语法如下所示: class ClassName<…...

【力扣每日一题】2023.8.30 到家的最少跳跃次数
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们一只跳蚤,我们可以操控它前跳 a 格或是后跳 b 格,不能跳到小于0的位置,有一些被禁止的点不…...

精读《算法题 - 地下城游戏》
今天我们看一道 leetcode hard 难度题目:地下城游戏。 恶魔们抓住了公主并将她关在了地下城 dungeon 的 右下角 。地下城是由 m x n 个房间组成的二维网格。我们英勇的骑士最初被安置在 左上角 的房间里,他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士…...

随记-Kibana Dev Tools,ES 增删改查 索引,Document
索引 创建索引 创建索引 PUT index_test创建索引 并 修改分片信息 # 创建索引 并 修改分片信息 PUT index_test2 { # 必须换行, PUT XXX 必须独占一行,类似的 其他请求也需要独占一行 "settings": {"number_of_shards": 1, # 主分片"…...

什么是架构,架构的本质是什么
不论是开发人员还是架构师,我们都一直在跟软件系统打交道,架构是在工作中出现最频繁的术语之一。那么,到底什么是架构?你可能有自己的答案,也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…...

Python爬虫(十七)_糗事百科案例
糗事百科实例 爬取糗事百科段子,假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求: 使用requests获取页面信息,用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…...

Ae 效果:CC Threads
生成/CC Threads Generate/CC Threads CC Threads(CC 编织条)效果基于当前图层像素生成编织条图案和纹理。可以用在各种设计中,如背景设计、图形设计、文字设计等。 ◆ ◆ ◆ 效果属性说明 Width 宽度 设置编织的宽度。 默认值为 50。值越大…...

Kotlin 协程 - 多路复用 select()
一、概念 又叫选择表达式,是一个挂起函数,可以同时等待多个挂起结果,只取用最快恢复的那个值(即多种方式获取数据,哪个更快返回结果就用哪个)。 同时到达 select() 会优先选择先写子表达式,想随…...

学习笔记-ThreadLocal
ThreadLocal 什么是ThreadLocal? ThreadLocal 是线程本地变量类,在多线程并行执行过程中,将变量存储在ThreadLocal中,每个线程中都有独立的变量,因此不会出现线程安全问题。 应用举例 解决线程安全问题:例…...

python利用pandas统计分析—groupby()函数的使用
文章目录 一、groupby使用场景二、groupby基本原理三、groupby分组运算基础聚合操作:只能选择一种聚合操作agg 聚合操作:可以针对同列选择不同聚合方法transformapply 四、groupby分组后去重统计nunique()五、groupby分组后重命名列名rename()直接重新命…...

OPENCV实现ORB特征检测
# -*- coding:utf-8 -*- """ 作者:794919561 日期:2023/8/31 """ import cv2 import numpy as np# 读图像 img = cv2.imread(F:\\learnOpenCV\\openCVLearning\\pictures\\chess.jpg)...

W5100S-EVB-PICO主动PING主机IP检测连通性(十)
前言 上一章节我们用我们开发板在UDP组播模式下进行数据回环测试,本章我们用开发板去主动ping主机IP地址来检测与该主机之间网络的连通性。 什么是PING? PING是一种命令, 是用来探测主机到主机之间是否可通信,如果不能ping到某台…...

使用 Nginx 搭建文件下载服务器
文章目录 一、基础环境二、适用场景三、方法和步骤四、其他说明 版权声明:本文为CSDN博主「杨群」的原创文章,遵循 CC 4.0 BY-SA版权协议,于2023年8月27日首发于CSDN,转载请附上原文出处链接及本声明。 原文链接:http…...

链式栈StackT
C关键词:内部类/模板类/头插 C自学精简教程 目录(必读) C数据结构与算法实现(目录) 栈的内存结构 空栈: 有一个元素的栈: 多个元素的栈: 成员函数说明 0 clear 清空栈 clear 函数负责将栈的对内存释放…...