Kafka保证消息幂等以及解决方案
1、幂等的基本概念
幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。
查询操作(天然幂等)
查询一次和查询多次,在数据不变的情况下,查询结果是一样的。查询是天然的幂等操作删除操作 (天然幂等) 删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在返回 0,删除的数据多条,返回结果多个)。
删除操作 (天然幂等)
删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在, 返回 0,删除的数据多条,返回结果多个).
新增操作
新增操作,这种情况下多次请求,可能会产生重复数据;
修改操作
修改操作,如果只是单纯的更新数据,比如: update account set money=100 where id=1,是没有问题的,如果还有计算,比如: update account set money=money+100 where id=l,这种情况下多次请求,可能会导致数据错误。
总结:当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
2、产生消息重复的原因
在可联网应用中,尤其在网络不稳定的情况下,消息队列的消息有可能会出现重复,如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:
发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者生产者宕机,导致服务端对生产者应答失败。 如果此时生产者 Producer 意识到消息发送失败并尝试再次发送消息,消费者 Consumer 后续会收到两条内容相同的消息。
投递时消息重复 消息消费的场景下,消息已投递到消费者 Consumer 并完成业务处理,当消费者给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者 Consumer 后续会收到两条内容相同的消息;
负载均衡时消息重复 当消息队列的服务端或消费者重启、扩容或缩容时,都有可能会触发 rebalance,此时消费者 Consumer 可能会收到重复消息。
3、解决方案及案例分析
既然消息可能会产生重复,那如何解决消息幂等的问题呢?我们需要从生产者、中间件、消费者这几个不同层面,来保证消息的幂等,[消息的幂等业界有很多种方案,我这里列出常见的几种方案供大家参考
3.1 设置业务唯一 key 方案 (应用最广泛)
业务唯一key 可以是单个字段或者组合字段,这个方案是怎么实现的呢?
生产者消息休构造业务唯一 key,消息端针对这个 key 加分布式锁;
在消费端,创建一个消息防重表,利用插入记录唯一健约束控制, 但是这会与业务有一定的耦合,另外高并发下频繁对消息防重表进行操作,性能比较低,不太建议使用,我们通常是在消费端加一个redis分布式锁,防止短期内消息的重复投递;
数据库业务表加唯一索引 (数据库)
以用户在积分商城下单为例,具体业务流程如下:
1、客户发起支付流程;
2、生产者生产消息构造一个订单号作为消息体幂等的唯一 key;
3、发送消息给 broker,broker 持久化消息到磁盘;
4、消费者开始消费消息,在消费逻辑中加一个分布式锁,key为订单号,防止短时间内消息重复投递;
5、当加锁成功后,执行核心业务逻辑,然后释放分布式锁,当加锁失败,直接结束;
6、最后,为了防止后续生产者重复推送相同唯一key 的消息我们需要在数据库的业务表中给这个订单号加一个唯一索引,通过唯一健约束来保证数据库表不会出现两条相同的记录,从而实现消息幂等
3.2 设置业务唯一id方案
这个其实跟上一个方案类似,只是唯一id是需要我们通过 分布式id服务 生成,其他的处理方法跟上一个方案一样。
3.3 基于业务的状态机方案
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),我们以业务单据为例:在业务单据上面会有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,当消费业务消息的时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的消息,直接丢弃消息不处理,保证了有限状态机的幂等。
3.4 基于version版本号的乐观锁方案
此方案一般是适用于更新业务的场景,更新表的时候通过版本号对比来保证消息的幂等
具体业务流程
1、客户购买商品,完成后准备发送一条扣减账户200 的消息;
2、生产端开始生产消息,构造消息体{ id=1,money=200,version=1 };
3、发送消息给 broker,broker 持久化这条消息后,返回确认消息给生产者,此时时出现了网络闪断或者生产者宕机,导致 broker 对生产者响应失败, 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同的消息;
4、消费者收到相同的消息,开始消费第一条消息{id=1,money=200,version=1},根据 version=1 更新记录更新成功;
5、接着开始消费第二条消息{id=1,money=200,version=l },根据 version=1更新记录,但是此时 version 已经被更新为 2,条件不满足更新失败;
6、消费者通过基于version的乐观锁保证了消息幂等。
3.5 insert ... on duplicate key update 方案
此方案一般适合一些统计更新类的业务或者定时同步第三方平台数据到自己数据库的场景,例如: 定时同步企业微信的成员数据到自已企微库的成员表,就可以采用这种方案实现。
on dupdate key update 语句基本功能是:当表中没有原来记录时,就插入,有的话就更新。
1. on duplicate key update 语句根据主键id或唯一键来判断当前插入是否已存在。
2. 记录已存在时,只会更新on duplicate key update之后指定的字段。
3. 如果同时传递了主键和唯一键,以主键为判断存在依据,唯一键字段内容可以被修改。
具体业务流程:
1、张三关注某 A 公司企业微信,加人深圳区企微群;
2、管理员在深圳区企微群投放一个活动,张三第一次点击这个活动,这个时候活动模块发送一条 kafka 消息;
3、在数据库创建一张活动效果统计表,act_code 和 usr_id 两个字段作为联合唯一索引;
4、数据处理模块消费这条消息,通过 insert on duplicate key update 插人一条记录;
5、过了几小时,张三第二次点击这个活动,这个时候活动模块再发送一条 kafka 消息,数据处理模块再次消费这条消息,通过 insert...on duplicate key update 更新对应唯一索引的这条记录的更新时间字段;
6、通过 insert...on duplicate key update 命令,可以实现数据库表不会出现重复的记录,还能实现业务的更新逻辑。
补充:
消息消费失败的时候,可以做好监控报警,以便进行人工干预;
消费消息的方法,确保在同一个事务,以便消费失败的时候,可以回滚;
相关文章:

Kafka保证消息幂等以及解决方案
1、幂等的基本概念 幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。 查询操作(天然幂等…...

接口测试用例设计
接口测试...

wireshark抓rtp包,提取出H265裸流数
调试rtsp收发流时,经常会需要抓包以确认是网络问题还是程序问题还是其它问题。通过tcpdump或者wireshark抓到的包通常是rtp流,保存为.pcap格式文件后中,可通过wireshark进行解析,得出h264裸流,并保存为文件。 1.wires…...

Excel往Word复制表格时删除空格
1.背景 在编写文档,经常需要从Excel往Word里复制表格 但是复制过去的表格前面会出现空格(缩进) 再WPS中试了很多方法,终于摆脱了挨个删除的困扰 2. WPS排版中删除 选择表格菜单栏-选在【开始】-【排版】选择【更多段落处理】-【段…...

客户机操作系统已禁用 CPU。请关闭或重置虚拟机(解决)
解决: 关闭虚拟机进入设置点击处理器给虚拟化引擎两个勾上确认后重新即可...

UnityShaderLab —— 简单的流光shader
原理: 就是在原先的模型表面叠加一层可以流动的图片, 算法代码: float2 tex; tex float2(i.uv.x - _Time.x * _Speed,i.uv.y); fixed4 col0 tex2D(_Tex, tex)* _Strenth; fixed4 col1 tex2D(_MainTex, i.uv); return col0 col1; 这里…...

代理IP在保护跨境商家网络安全中的重要作用
在当前全球化的背景下,跨境电商成为一种重要的商业模式,越来越多的商家涌入国际市场,商家们通过互联网平台将商品远销国外,但网络安全风险随之而来。跨境商家因为需要处理大量的在线交易和产品数据,如果未能对这些敏感…...

2核4G服务器支持多少用户同时在线访问?卡不卡?
腾讯云轻量2核4G5M带宽服务器支持多少人在线访问?5M带宽下载速度峰值可达640KB/秒,阿腾云以搭建网站为例,假设优化后平均大小为60KB,则5M带宽可支撑10个用户同时在1秒内打开网站,从CPU内存的角度,网站程序效…...

[Error]在Swift项目Build Settings的Preprocessor Macros中定义的宏无效的问题
问题 如图,在Build Settings -> Preprocessor Macros中添加了ISADEMO1。但在代码中判断无效,还是会输出“isn’t ADemo” #if ISADEMOprint("is ADemo") #elseprint("isnt ADemo") #endif解决 如图,要让Preproces…...

网格管理安全巡检系统—助企业全面安全检查
通过应用安全巡检管理系统,企业能更好地管理控制安全风险,保障员工生命安全和财产安全,避免出现各种危险隐患,帮助企业快速提高生产发展实力。 一、凡尔码搭建安全巡检系统的功能 1.巡检计划:帮助用户制定巡检计划,包括…...
【Java】replace替换方法
String 替换方法 replace() 方法用于将目标字符串中的指定字符(串)替换成新的字符(串)replaceFirst() 方法用于将目标字符串中匹配某正则表达式的第一个子字符串替换成新的字符串replaceAll() 方法用于将目标字符串中匹配某正则表…...
CentOS yum update
详情内容 CentOS yum update升级命令详解,包括yum check-update,yum update,yum install等升级安装命令详细使用方法。 1.列出所有可更新的软件清单 命令: yum check-update 2.安装所有更新软件 命令: yum updat…...
/etc/profile与~/.bash_profile的区别
/etc/profile和~/.bash_profile都是用于存储用户的配置文件的,但它们的作用范围和加载顺序有所不同。 /etc/profile是系统级的配置文件,它应用于所有用户。当用户登录时,系统会首先加载/etc/profile。这个文件存储了系统范围的环境变量、系统…...

vue+element实现电商商城礼品代发网,商品、订单管理
一、项目效果图 1.首页 2.登录 版本2: 3.注册 4.找回密码 5.立即下单 6.商品详情 7.个人中心-工作台 8.个人中心-订单列表 9.订单中心-包裹列表 10.个人中心-工单管理 11.我的钱包 12.实名认证 13.升级vip 14.个人中心-推广赚钱 二、关键源码 1.路由配置 impor…...

Python接口自动化-requests模块之post请求
一、源码解析 def post(url, dataNone, jsonNone, **kwargs):r"""Sends a POST request.:param url: URL for the new :class:Request object.:param data: (optional) Dictionary, list of tuples, bytes, or file-likeobject to send in the body of the :cl…...
DDoS检测防御实现方案
流量采集模式 通过分光器将流量直接镜像到攻击检测器,收包采用DPDK库。 当前整机流量、源IP信息、连接数 、连接内容(五元组等)的信息汇聚 当发生告警时采样原始数据包, 采用固定采样算法 基于检测对象的TCP syn ack psh ack established的个数、流量…...

ArcGIS: 第二届全国大学生GIS技能大赛(广西师范学院)详解-下午题
目录 01 题目 02 思路和实操 2.1 流域提取-思路 2.2 流域提取-实操 2.2.1 获取DEM 编辑 2.2.2 水文分析-提取流域基于单出水口 2.3 河网分级-思路 2.4 河网分级-实操 2.4.1 提取河道网络 2.4.2 河网分级 编辑 2.5 子流域提取和处理-思路 2.6 子流域提取和处理-实…...

vue七牛云视频直传
完成后样式: 下面的代码是我自己项目里面用到的,一些判断看自己情况去掉,用的是element-ui组件 安装 uuid 库。你可以使用 npm 或 yarn 来完成安装。在终端中执行以下命令: npm install uuidhtml部分 <el-upload class&quo…...

云原生Kubernetes:K8S集群版本升级(v1.20.15 - v1.22.14)
目录 一、理论 1.K8S集群升级 2.集群概况 3.升级集群(v1.21.14) 4.验证集群(v1.21.14) 5.升级集群(v1.22.14) 6.验证集群 (v1.22.14) 二、实验 1.升级集群(v1.21.14) 2.验…...

VUE树结构实现
实现效果: 数据库表结构如下: 要求:需要有parentId,id。parentId就是父记录的id 表数据要求:一定不要让一条记录的parentid和id相同 前端代码: 注意:el-table标签里面需要加上属性,才可以有下拉箭头的样式 <el-table v-loading="listLoading" :data...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...

Linux部署私有文件管理系统MinIO
最近需要用到一个文件管理服务,但是又不想花钱,所以就想着自己搭建一个,刚好我们用的一个开源框架已经集成了MinIO,所以就选了这个 我这边对文件服务性能要求不是太高,单机版就可以 安装非常简单,几个命令就…...

6.9-QT模拟计算器
源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...
深入解析 ReentrantLock:原理、公平锁与非公平锁的较量
ReentrantLock 是 Java 中 java.util.concurrent.locks 包下的一个重要类,用于实现线程同步,支持可重入性,并且可以选择公平锁或非公平锁的实现方式。下面将详细介绍 ReentrantLock 的实现原理以及公平锁和非公平锁的区别。 ReentrantLock 实现原理 基本架构 ReentrantLo…...

高保真组件库:开关
一:制作关状态 拖入一个矩形作为关闭的底色:44 x 22,填充灰色CCCCCC,圆角23,边框宽度0,文本为”关“,右对齐,边距2,2,6,2,文本颜色白色FFFFFF。 拖拽一个椭圆,尺寸18 x 18,边框为0。3. 全选转为动态面板状态1命名为”关“。 二:制作开状态 复制关状态并命名为”开…...

云原生时代的系统设计:架构转型的战略支点
📝个人主页🌹:一ge科研小菜鸡-CSDN博客 🌹🌹期待您的关注 🌹🌹 一、云原生的崛起:技术趋势与现实需求的交汇 随着企业业务的互联网化、全球化、智能化持续加深,传统的 I…...

【汇编逆向系列】六、函数调用包含多个参数之多个整型-参数压栈顺序,rcx,rdx,r8,r9寄存器
从本章节开始,进入到函数有多个参数的情况,前面几个章节中介绍了整型和浮点型使用了不同的寄存器在进行函数传参,ECX是整型的第一个参数的寄存器,那么多个参数的情况下函数如何传参,下面展开介绍参数为整型时候的几种情…...