当前位置: 首页 > news >正文

Kafka保证消息幂等以及解决方案

1、幂等的基本概念
幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。

查询操作(天然幂等)
查询一次和查询多次,在数据不变的情况下,查询结果是一样的。查询是天然的幂等操作删除操作 (天然幂等) 删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在返回 0,删除的数据多条,返回结果多个)。

删除操作 (天然幂等)
删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在, 返回 0,删除的数据多条,返回结果多个).

新增操作
新增操作,这种情况下多次请求,可能会产生重复数据;

修改操作
修改操作,如果只是单纯的更新数据,比如: update account set money=100 where id=1,是没有问题的,如果还有计算,比如: update account set money=money+100 where id=l,这种情况下多次请求,可能会导致数据错误。

总结:当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

2、产生消息重复的原因
在可联网应用中,尤其在网络不稳定的情况下,消息队列的消息有可能会出现重复,如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者生产者宕机,导致服务端对生产者应答失败。 如果此时生产者 Producer 意识到消息发送失败并尝试再次发送消息,消费者 Consumer 后续会收到两条内容相同的消息。

投递时消息重复 消息消费的场景下,消息已投递到消费者 Consumer 并完成业务处理,当消费者给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者 Consumer 后续会收到两条内容相同的消息;

负载均衡时消息重复 当消息队列的服务端或消费者重启、扩容或缩容时,都有可能会触发 rebalance,此时消费者 Consumer 可能会收到重复消息。

3、解决方案及案例分析
既然消息可能会产生重复,那如何解决消息幂等的问题呢?我们需要从生产者、中间件、消费者这几个不同层面,来保证消息的幂等,[消息的幂等业界有很多种方案,我这里列出常见的几种方案供大家参考

3.1 设置业务唯一 key 方案 (应用最广泛)
业务唯一key 可以是单个字段或者组合字段,这个方案是怎么实现的呢?

生产者消息休构造业务唯一 key,消息端针对这个 key 加分布式锁;

在消费端,创建一个消息防重表,利用插入记录唯一健约束控制, 但是这会与业务有一定的耦合,另外高并发下频繁对消息防重表进行操作,性能比较低,不太建议使用,我们通常是在消费端加一个redis分布式锁,防止短期内消息的重复投递;

数据库业务表加唯一索引 (数据库)

以用户在积分商城下单为例,具体业务流程如下:

1、客户发起支付流程;

2、生产者生产消息构造一个订单号作为消息体幂等的唯一 key;

3、发送消息给 broker,broker 持久化消息到磁盘;

4、消费者开始消费消息,在消费逻辑中加一个分布式锁,key为订单号,防止短时间内消息重复投递;

5、当加锁成功后,执行核心业务逻辑,然后释放分布式锁,当加锁失败,直接结束;

6、最后,为了防止后续生产者重复推送相同唯一key 的消息我们需要在数据库的业务表中给这个订单号加一个唯一索引,通过唯一健约束来保证数据库表不会出现两条相同的记录,从而实现消息幂等

3.2 设置业务唯一id方案
这个其实跟上一个方案类似,只是唯一id是需要我们通过 分布式id服务 生成,其他的处理方法跟上一个方案一样。

3.3 基于业务的状态机方案
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),我们以业务单据为例:在业务单据上面会有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,当消费业务消息的时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的消息,直接丢弃消息不处理,保证了有限状态机的幂等。

3.4 基于version版本号的乐观锁方案
此方案一般是适用于更新业务的场景,更新表的时候通过版本号对比来保证消息的幂等

具体业务流程
1、客户购买商品,完成后准备发送一条扣减账户200 的消息;

2、生产端开始生产消息,构造消息体{ id=1,money=200,version=1 };

3、发送消息给 broker,broker 持久化这条消息后,返回确认消息给生产者,此时时出现了网络闪断或者生产者宕机,导致 broker 对生产者响应失败, 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同的消息;

4、消费者收到相同的消息,开始消费第一条消息{id=1,money=200,version=1},根据 version=1 更新记录更新成功;

5、接着开始消费第二条消息{id=1,money=200,version=l },根据 version=1更新记录,但是此时 version 已经被更新为 2,条件不满足更新失败;

6、消费者通过基于version的乐观锁保证了消息幂等。

3.5 insert ... on duplicate key update 方案
此方案一般适合一些统计更新类的业务或者定时同步第三方平台数据到自己数据库的场景,例如: 定时同步企业微信的成员数据到自已企微库的成员表,就可以采用这种方案实现。

on dupdate key update 语句基本功能是:当表中没有原来记录时,就插入,有的话就更新。

1. on duplicate key update 语句根据主键id或唯一键来判断当前插入是否已存在。
2. 记录已存在时,只会更新on duplicate key update之后指定的字段。
3. 如果同时传递了主键和唯一键,以主键为判断存在依据,唯一键字段内容可以被修改。

具体业务流程:
1、张三关注某 A 公司企业微信,加人深圳区企微群;
2、管理员在深圳区企微群投放一个活动,张三第一次点击这个活动,这个时候活动模块发送一条 kafka 消息;
3、在数据库创建一张活动效果统计表,act_code 和 usr_id 两个字段作为联合唯一索引;
4、数据处理模块消费这条消息,通过 insert on duplicate key update 插人一条记录;
5、过了几小时,张三第二次点击这个活动,这个时候活动模块再发送一条 kafka 消息,数据处理模块再次消费这条消息,通过 insert...on duplicate key update 更新对应唯一索引的这条记录的更新时间字段;
6、通过 insert...on duplicate key update 命令,可以实现数据库表不会出现重复的记录,还能实现业务的更新逻辑。

补充:

消息消费失败的时候,可以做好监控报警,以便进行人工干预;
消费消息的方法,确保在同一个事务,以便消费失败的时候,可以回滚;
 

相关文章:

Kafka保证消息幂等以及解决方案

1、幂等的基本概念 幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。 查询操作(天然幂等…...

接口测试用例设计

接口测试...

wireshark抓rtp包,提取出H265裸流数

调试rtsp收发流时,经常会需要抓包以确认是网络问题还是程序问题还是其它问题。通过tcpdump或者wireshark抓到的包通常是rtp流,保存为.pcap格式文件后中,可通过wireshark进行解析,得出h264裸流,并保存为文件。 1.wires…...

Excel往Word复制表格时删除空格

1.背景 在编写文档,经常需要从Excel往Word里复制表格 但是复制过去的表格前面会出现空格(缩进) 再WPS中试了很多方法,终于摆脱了挨个删除的困扰 2. WPS排版中删除 选择表格菜单栏-选在【开始】-【排版】选择【更多段落处理】-【段…...

客户机操作系统已禁用 CPU。请关闭或重置虚拟机(解决)

解决: 关闭虚拟机进入设置点击处理器给虚拟化引擎两个勾上确认后重新即可...

UnityShaderLab —— 简单的流光shader

原理: 就是在原先的模型表面叠加一层可以流动的图片, 算法代码: float2 tex; tex float2(i.uv.x - _Time.x * _Speed,i.uv.y); fixed4 col0 tex2D(_Tex, tex)* _Strenth; fixed4 col1 tex2D(_MainTex, i.uv); return col0 col1; 这里…...

代理IP在保护跨境商家网络安全中的重要作用

在当前全球化的背景下,跨境电商成为一种重要的商业模式,越来越多的商家涌入国际市场,商家们通过互联网平台将商品远销国外,但网络安全风险随之而来。跨境商家因为需要处理大量的在线交易和产品数据,如果未能对这些敏感…...

2核4G服务器支持多少用户同时在线访问?卡不卡?

腾讯云轻量2核4G5M带宽服务器支持多少人在线访问?5M带宽下载速度峰值可达640KB/秒,阿腾云以搭建网站为例,假设优化后平均大小为60KB,则5M带宽可支撑10个用户同时在1秒内打开网站,从CPU内存的角度,网站程序效…...

[Error]在Swift项目Build Settings的Preprocessor Macros中定义的宏无效的问题

问题 如图,在Build Settings -> Preprocessor Macros中添加了ISADEMO1。但在代码中判断无效,还是会输出“isn’t ADemo” #if ISADEMOprint("is ADemo") #elseprint("isnt ADemo") #endif解决 如图,要让Preproces…...

网格管理安全巡检系统—助企业全面安全检查

通过应用安全巡检管理系统,企业能更好地管理控制安全风险,保障员工生命安全和财产安全,避免出现各种危险隐患,帮助企业快速提高生产发展实力。 一、凡尔码搭建安全巡检系统的功能 1.巡检计划:帮助用户制定巡检计划,包括…...

【Java】replace替换方法

String 替换方法 replace() 方法用于将目标字符串中的指定字符(串)替换成新的字符(串)replaceFirst() 方法用于将目标字符串中匹配某正则表达式的第一个子字符串替换成新的字符串replaceAll() 方法用于将目标字符串中匹配某正则表…...

CentOS yum update

详情内容 CentOS yum update升级命令详解,包括yum check-update,yum update,yum install等升级安装命令详细使用方法。 1.列出所有可更新的软件清单 命令: yum check-update 2.安装所有更新软件 命令: yum updat…...

/etc/profile与~/.bash_profile的区别

/etc/profile和~/.bash_profile都是用于存储用户的配置文件的,但它们的作用范围和加载顺序有所不同。 /etc/profile是系统级的配置文件,它应用于所有用户。当用户登录时,系统会首先加载/etc/profile。这个文件存储了系统范围的环境变量、系统…...

vue+element实现电商商城礼品代发网,商品、订单管理

一、项目效果图 1.首页 2.登录 版本2: 3.注册 4.找回密码 5.立即下单 6.商品详情 7.个人中心-工作台 8.个人中心-订单列表 9.订单中心-包裹列表 10.个人中心-工单管理 11.我的钱包 12.实名认证 13.升级vip 14.个人中心-推广赚钱 二、关键源码 1.路由配置 impor…...

Python接口自动化-requests模块之post请求

一、源码解析 def post(url, dataNone, jsonNone, **kwargs):r"""Sends a POST request.:param url: URL for the new :class:Request object.:param data: (optional) Dictionary, list of tuples, bytes, or file-likeobject to send in the body of the :cl…...

DDoS检测防御实现方案

流量采集模式 通过分光器将流量直接镜像到攻击检测器,收包采用DPDK库。 当前整机流量、源IP信息、连接数 、连接内容(五元组等)的信息汇聚 当发生告警时采样原始数据包, 采用固定采样算法 基于检测对象的TCP syn ack psh ack established的个数、流量…...

ArcGIS: 第二届全国大学生GIS技能大赛(广西师范学院)详解-下午题

目录 01 题目 02 思路和实操 2.1 流域提取-思路 2.2 流域提取-实操 2.2.1 获取DEM ​编辑 2.2.2 水文分析-提取流域基于单出水口 2.3 河网分级-思路 2.4 河网分级-实操 2.4.1 提取河道网络 2.4.2 河网分级 ​编辑 2.5 子流域提取和处理-思路 2.6 子流域提取和处理-实…...

vue七牛云视频直传

完成后样式&#xff1a; 下面的代码是我自己项目里面用到的&#xff0c;一些判断看自己情况去掉&#xff0c;用的是element-ui组件 安装 uuid 库。你可以使用 npm 或 yarn 来完成安装。在终端中执行以下命令&#xff1a; npm install uuidhtml部分 <el-upload class&quo…...

云原生Kubernetes:K8S集群版本升级(v1.20.15 - v1.22.14)

目录 一、理论 1.K8S集群升级 2.集群概况 3.升级集群&#xff08;v1.21.14&#xff09; 4.验证集群&#xff08;v1.21.14&#xff09; 5.升级集群&#xff08;v1.22.14&#xff09; 6.验证集群 (v1.22.14) 二、实验 1.升级集群&#xff08;v1.21.14&#xff09; 2.验…...

VUE树结构实现

实现效果: 数据库表结构如下: 要求:需要有parentId,id。parentId就是父记录的id 表数据要求:一定不要让一条记录的parentid和id相同 前端代码: 注意:el-table标签里面需要加上属性,才可以有下拉箭头的样式 <el-table v-loading="listLoading" :data...

Node.js 正在逐渐被淘汰!Bun 1.0 正在改变 JavaScript 的游戏规则

在深入讨论之前&#xff0c;我们需要解释什么是 JavaScript 运行时以及为什么我们应该关心其速度。 想象一下&#xff0c;你用 JavaScript 写了一个故事&#xff0c;需要有人大声读出来。JavaScript 运行时就像是那个友好的叙述者&#xff0c;为你的故事赋予生命&#xff01;它…...

[Machine Learning][Part 5]监督学习——逻辑回归

之前文章中提到监督学习的应用可分为两类&#xff1a;线性回归和逻辑回归。和线性回归不同&#xff0c;逻辑回归输出只有0和1。对于一个逻辑回归任务&#xff0c;可以先使用线性回归来预测y。然而我们希望逻辑回归预测模型输出的是0和1&#xff0c;为了达到这个目的&#xff0c…...

whistle安卓手机抓包(图文详解)

1、安装node https://nodejs.org &#xff08;官网下载对应的node,一般推荐长期稳定版本 LTS&#xff09; 需要node的版本是大于 v0.10.0 查看自己本地node 版本号 node -v2、安装whistle npm i -g whistle3、开启whistle 补充说明&#xff1a; ● w2 stop&#xff1a;关闭…...

【经典排序算法 time: 2023-10-12】冒泡排序(层层优化改进)

原理 每次循环找出一个最大的元素&#xff08;动态演示&#xff09;第一版冒泡 public class Maopao1 {public static void main(String[] args) {long start System.currentTimeMillis();int[] arr2 {11, 23, 69, 99, 1, 3, 45, 67, 5, 234, 678, 999, 7, 123};int[] result…...

【图像去噪的扩散滤波】图像线性扩散滤波、边缘增强线性和非线性各向异性滤波(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

4、在docker容器内的tomcat 中发布项目

1、查看本地是否有tomcat镜像&#xff0c;如果不存在则去下载 docker images 2、查看本地是否有tomcat容器&#xff0c;如存在跳过第3步 docker ps 3、创建tomcat容器 此容器用于复制tomcat的配置文件&#xff0c;配置文件复制后需删除此容器&#xff0c;如果已经存在跳过此步…...

数学建模——人工神经网络模型

一、人工神经网络简介 1、神经网络起源与应用 1943年心理学家McCulloch和数学家Pitts提出神经元生物数学模型&#xff08;M-P模型&#xff09;&#xff0c;后来人工神经网络(Artifical Neural Network,ANN)是在生物神经网络(Biological Neural Network,BNN)基础上发展起来的&a…...

java合成多个pdf为一个pdf

pom文件 <dependency><groupId>com.lowagie</groupId><artifactId>itext</artifactId><version>2.1.7</version></dependency>主文件 import com.lowagie.text.Document; import com.lowagie.text.pdf.PdfCopy; import com.lo…...

“高级Vue状态管理 - Vuex的魅力与应用“

目录 引言1. Vuex的简介1.1 什么是Vuex&#xff1f;1.2 Vuex的核心概念 2. Vuex的值获取与改变(综合案例)3. Vuex的异步请求总结 引言 在现代Web开发中&#xff0c;前端应用变得越来越复杂。随着应用规模的扩大和数据流的复杂性增加&#xff0c;有效地管理应用的状态成为了一项…...

Vue整合

基础配置&#xff1a; 1.创建&#xff1a;cmd 中 输入 create vue vue_name 启动命令&#xff1a;npm run serve 2.当node_modules(依赖)丢失时通过 npm install 下载 【根据&#xff1a;package-lock.json下载】 3.下载路由 npm i vue-router3.5.2 -S main.js导入 // np…...