当前位置: 首页 > news >正文

如何确保消息只被消费一次:Java实现详解

引言

在分布式系统中,消息传递是系统组件间通信的重要方式,而确保消息在传递过程中只被消费一次是一个关键问题。如果一个消息被多次消费,可能会导致业务逻辑重复执行,进而产生数据不一致、错误操作等问题。特别是在金融、电商等敏感领域,消息重复消费带来的影响可能是灾难性的。

为了确保消息只被消费一次,消息队列(如 Kafka、RabbitMQ、RocketMQ)提供了多种机制和设计模式,但不同场景下的实现方式有所不同。本文将深入分析如何确保消息在分布式系统中只被消费一次,结合 Java 代码实例,探讨常见的设计模式和解决方案,包括消息幂等性、分布式事务、消息签名、数据库和消息队列的一致性等方面的实现。


第一部分:消息消费的挑战

在分布式系统中,确保消息只被消费一次面临多重挑战,尤其是在网络故障、消息传递延迟或消费者宕机等情况下。以下是一些常见的场景和问题:

1.1 消息丢失

消息丢失是消息传递中的一种常见问题,特别是在消息发送或接收过程中出现网络故障时。要确保消息不丢失,通常需要消息队列提供“至少一次”的投递保障,即使消息可能被重复投递。

1.2 消息重复消费

消息重复消费是指同一条消息被多个消费者重复消费的问题。这通常是由于消费者确认机制或网络问题引起的。为了避免消息重复消费,我们需要确保“最多一次”或“精确一次”的消息投递语义。

1.3 消息幂等性问题

即使确保了消息只被投递一次,消费者处理消息的幂等性也是关键问题。如果消费者在处理消息时没有幂等性保障,则重复的消息消费可能导致错误的业务逻辑执行。


第二部分:消息队列中的消费语义

不同的消息队列系统提供了不同的消费语义,了解这些语义是确保消息只被消费一次的基础。常见的消费语义包括:

2.1 最多一次(At Most Once)

“最多一次”意味着消息可能会丢失,但绝不会被重复消费。这种语义保证消息至多被处理一次,但可能存在消息丢失的风险。在金融、电商等对数据一致性要求较高的场景下,这种语义通常不适用。

2.2 至少一次(At Least Once)

“至少一次”意味着消息一定会被消费,但可能会被消费多次。消息重复消费的问题需要由消费者自行解决,通常通过幂等性或去重机制来保障。

2.3 精确一次(Exactly Once)

“精确一次”是最理想的消息投递语义,意味着消息既不会丢失也不会重复消费。实现“精确一次”的消息传递需要更多的系统资源和复杂的设计,通常通过事务和幂等机制来实现。


第三部分:常见的解决方案

在确保消息只被消费一次时,常见的解决方案包括幂等性处理、分布式事务、消息签名和消息投递确认等机制。

3.1 消息幂等性

幂等性是指同一操作无论执行多少次,结果都相同。在消息消费的场景中,如果我们能够确保每一条消息的处理结果是幂等的,那么即使消息被重复消费,也不会产生错误的结果。

幂等性实现的几种方式:

  1. 唯一ID去重:每条消息携带一个全局唯一的ID,消费者在处理消息时,先检查该ID是否已经处理过。如果已处理过,则忽略该消息。
  2. 状态标记:将每次操作的状态持久化到数据库中,消息处理之前检查状态是否已完成,避免重复处理。
3.2 分布式事务

分布式事务通过两阶段提交、补偿事务等方式来保证多个系统之间的数据一致性。在消息系统中,分布式事务可以确保消息的发送和消费是原子操作,即消息被消费后,其对应的业务操作也被执行且只有一次。

3.3 消息签名

消息签名是一种防止消息被篡改和重复消费的方式。每条消息在发送时通过签名算法生成一个唯一的签名,消费者在处理消息时,验证签名是否正确。如果签名验证失败,消息将被拒绝处理。

3.4 消息确认机制

许多消息队列系统(如 RabbitMQ、Kafka)支持消息确认机制。消费者在成功处理消息后,向消息队列发送确认信息,消息队列才会将消息标记为已消费。如果消费者处理失败,消息可以被重新投递。


第四部分:基于 Kafka 的消息消费实现

Kafka 是一种常用的分布式消息队列系统,提供了“至少一次”的投递语义。为了确保消息只被消费一次,我们可以结合幂等性、消息ID去重和数据库事务来实现。

4.1 生产者配置幂等性

在 Kafka 中,我们可以通过配置生产者的幂等性来确保消息不会重复发送。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
// 开启幂等性
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

enable.idempotence 设置为 true 时,Kafka 会确保消息的生产是幂等的,即每条消息只会被成功写入一次。

4.2 消费者去重机制

消费者在消费消息时,我们可以通过消息ID去重来保证同一条消息不会被重复处理。

实现步骤:

  1. 每条消息携带一个唯一的消息ID。
  2. 消费者在处理消息时,首先检查该消息ID是否已经处理过。
  3. 如果已处理,则忽略该消息;如果未处理,则记录该消息ID并处理消息。

Java 实现示例:

@Service
public class MessageConsumerService {private Set<String> processedMessageIds = new HashSet<>();@Autowiredprivate MessageRepository messageRepository;public void consumeMessage(String messageId, String messageContent) {// 检查消息ID是否已处理if (processedMessageIds.contains(messageId)) {System.out.println("消息已经处理过,忽略: " + messageId);return;}// 处理消息逻辑processMessage(messageContent);// 将消息ID记录为已处理processedMessageIds.add(messageId);// 将消息处理状态持久化messageRepository.saveProcessedMessageId(messageId);}private void processMessage(String messageContent) {// 消息处理逻辑System.out.println("处理消息: " + messageContent);}
}

在上面的代码中,processedMessageIds 是一个内存中的集合,用于记录已处理的消息ID。实际生产中,可以将消息ID存储到数据库或 Redis 中,确保即使系统重启,已处理的消息也不会重复处理。

4.3 Kafka 事务保证

为了确保消息消费和业务操作的原子性,Kafka 提供了事务支持。通过开启 Kafka 事务,我们可以确保消息的消费与业务处理是一致的。

生产者事务设置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务
producer.initTransactions();// 开启事务
producer.beginTransaction();try {// 发送消息producer.send(new ProducerRecord<>("my-topic", "key", "value"));// 提交事务producer.commitTransaction();
} catch (Exception e) {// 事务回滚producer.abortTransaction();
}

消费者事务保证:

消费者在消费消息时,可以通过数据库事务保证业务逻辑的执行与消息消费的一致性。

@Transactional
public void consumeMessage(String messageId, String messageContent) {// 检查消息ID是否已处理if (messageRepository.isProcessed(messageId)) {return;}// 处理业务逻辑processMessage(messageContent);// 将消息ID记录为已处理messageRepository.saveProcessedMessageId(messageId);
}

通过数据库事务和 Kafka 事务的结合,我们可以确保每条消息只被消费一次且业务操作只执行一次。


第五部分:基于 RabbitMQ 的消息消费实现

RabbitMQ 是另一个常用的消息队列系统,它提供了多种确认机制来确保消息

不会丢失或被重复消费。

5.1 手动确认机制

在 RabbitMQ 中,默认情况下,消息在消费者处理完后会自动确认。如果要确保消息只被消费一次,我们可以启用手动确认机制,确保消费者在成功处理消息后才确认消息。

消费者手动确认实现:

@Component
public class RabbitMqConsumer {@Autowiredprivate MessageRepository messageRepository;@RabbitListener(queues = "myQueue")public void consumeMessage(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getMessageId();try {// 检查消息是否已处理if (!messageRepository.isProcessed(messageId)) {// 处理消息逻辑processMessage(new String(message.getBody()));// 记录消息为已处理messageRepository.saveProcessedMessageId(messageId);}// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 发生异常,拒绝处理channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}private void processMessage(String messageContent) {// 消息处理逻辑System.out.println("处理消息: " + messageContent);}
}

在上面的代码中,我们通过 channel.basicAck() 手动确认消息,只有在消息成功处理后才进行确认。如果处理失败,则通过 channel.basicNack() 拒绝消息处理,RabbitMQ 会重新投递消息。

5.2 消息唯一ID去重

与 Kafka 一样,RabbitMQ 消息也可以通过唯一ID进行去重处理,确保同一条消息不会被重复消费。

去重实现:

public class MessageRepository {private Set<String> processedMessageIds = new HashSet<>();public boolean isProcessed(String messageId) {return processedMessageIds.contains(messageId);}public void saveProcessedMessageId(String messageId) {processedMessageIds.add(messageId);}
}

通过将消息ID持久化,消费者可以在每次处理消息前检查该消息是否已被处理,避免重复消费。


第六部分:基于 RocketMQ 的消息消费实现

RocketMQ 是一款高性能、低延迟的分布式消息队列系统,它也支持幂等消费和事务消息,帮助开发者实现精确一次的消息消费。

6.1 消息幂等处理

与 Kafka 和 RabbitMQ 一样,RocketMQ 也可以通过消息ID去重和幂等操作来确保消息不会被重复消费。

RocketMQ 消费者实现:

public class RocketMqConsumer {@Autowiredprivate MessageRepository messageRepository;@RocketMQMessageListener(topic = "myTopic", consumerGroup = "myGroup")public void consumeMessage(MessageExt message) {String messageId = message.getMsgId();if (!messageRepository.isProcessed(messageId)) {// 处理消息逻辑processMessage(new String(message.getBody()));// 记录消息为已处理messageRepository.saveProcessedMessageId(messageId);}}private void processMessage(String messageContent) {// 消息处理逻辑System.out.println("处理消息: " + messageContent);}
}
6.2 事务消息

RocketMQ 支持事务消息,开发者可以通过事务消息确保消息的发送和消费过程具有一致性。


第七部分:总结

在分布式系统中,确保消息只被消费一次是一个复杂且重要的问题。本文从幂等性处理、分布式事务、消息确认机制等多个角度分析了如何解决这一问题,并结合 Kafka、RabbitMQ、RocketMQ 的实际使用场景,给出了 Java 代码实例。

要实现“精确一次”的消息投递语义,通常需要结合消息队列的机制和业务系统的设计,例如:

  • 利用消息ID去重实现幂等消费;
  • 使用数据库事务确保消息消费与业务处理的一致性;
  • 通过消息队列提供的事务或确认机制,确保消息不会被丢失或重复处理。

最终的方案应该根据具体的业务场景和系统需求进行权衡和选择,确保消息传递的可靠性和数据的一致性。

相关文章:

如何确保消息只被消费一次:Java实现详解

引言 在分布式系统中&#xff0c;消息传递是系统组件间通信的重要方式&#xff0c;而确保消息在传递过程中只被消费一次是一个关键问题。如果一个消息被多次消费&#xff0c;可能会导致业务逻辑重复执行&#xff0c;进而产生数据不一致、错误操作等问题。特别是在金融、电商等…...

Web3技术在元宇宙中的应用:从区块链到智能合约

随着元宇宙的兴起&#xff0c;Web3技术正逐渐成为其基础&#xff0c;推动着数字空间的重塑。元宇宙不仅是一个虚拟世界&#xff0c;它还代表着一个由去中心化技术驱动的新生态系统。在这个系统中&#xff0c;区块链和智能合约发挥着至关重要的作用&#xff0c;为用户提供安全、…...

关于QSizeGrip在ui界面存在布局的情况下的不显示问题

直接重写resizeEvent你会发现&#xff1a;grip并没有显示 void XXXXX::resizeEvent(QResizeEvent *event) {QWidget::resizeEvent(event);this->m_sizeGrip->move(this->width() - this->m_sizeGrip->width() - 3,this->height() - this->m_sizeGrip->…...

开始场景的制作+气泡特效的添加

3D场景或2D场景的切换 1.新建项目时选择3D项目或2D项目 2.如下图操作&#xff1a; 开始前的固有流程 按照如下步骤进行操作&#xff0c;于步骤3中更改Company Name等属性&#xff1a; 本案例分辨率可以如下设置&#xff0c;有能力者可根据需要自行调整&#xff1a; 场景制作…...

位运算--(二进制中1的个数)

位运算是计算机科学中一种高效的操作方式&#xff0c;常用于处理二进制数据。在Java中&#xff0c;位运算通常通过位移操作符和位与操作符实现。 当然位运算还有一些其他的奇淫巧计&#xff0c;今天介绍两个常用的位运算方法&#xff1a;返回整数x的二进制第k位的值和返回x的最…...

使用Docker和Macvlan驱动程序模拟跨主机跨网段通信

以下是使用Docker和Macvlan驱动程序模拟跨主机跨网段通信的架构图&#xff1a; #mermaid-svg-b7wuGoTr6eQYSNHJ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-b7wuGoTr6eQYSNHJ .error-icon{fill:#552222;}#mermai…...

RestCloud webservice 流程设计

RestCloud webservice 流程设计 操作步骤 离线数据集成&#xff08;首页&#xff09; → \rightarrow → 示例应用数据集成流程&#xff08;边栏&#xff09; → \rightarrow → 所有数据流程 → \rightarrow → webservice节点获取城市列表 → \rightarrow → 流程设计 …...

从入门到精通:QT 100个关键技术关键词

Qt基础概念 Qt Framework - 一个跨平台的C图形用户界面应用程序开发框架。它不仅提供了丰富的GUI组件&#xff0c;还包括网络、数据库访问、多媒体支持等功能。 Qt Creator - Qt官方提供的集成开发环境&#xff08;IDE&#xff09;&#xff0c;集成了代码编辑器、项目管理工具、…...

2024年双十一值得入手的好物有哪些?五大性价比拉满闭眼入好物盘点

随着2024年双十一购物狂欢节的临近&#xff0c;消费者们纷纷开始关注各类好物&#xff0c;期待在这一天能够以最优惠的价格入手心仪的商品&#xff0c;在这个特殊的时刻&#xff0c;我们为大家盘点了五大性价比拉满的闭眼入好物&#xff0c;这些产品不仅品质卓越&#xff0c;而…...

Hbase日常运维

1 Hbase日常运维 1.1 监控Hbase运行状况 1.1.1 操作系统 1.1.1.1 IO 群集网络IO&#xff0c;磁盘IO&#xff0c;HDFS IO IO越大说明文件读写操作越多。当IO突然增加时&#xff0c;有可能&#xff1a;1.compact队列较大&#xff0c;集群正在进行大量压缩操作。 2.正在执行…...

鸿蒙开发的基本技术栈及学习路线

随着智能终端设备的不断普及与技术的进步&#xff0c;华为推出的鸿蒙操作系统&#xff08;HarmonyOS&#xff09;迅速引起了全球的关注。作为一个面向多种设备的分布式操作系统&#xff0c;鸿蒙不仅支持手机、平板、智能穿戴设备等&#xff0c;还支持IoT&#xff08;物联网&…...

【算法】反向传播算法

David Rumelhart 是人工智能领域的先驱之一&#xff0c;他与 James McClelland 等人在1986年通过其著作《Parallel Distributed Processing: Explorations in the Microstructure of Cognition》详细介绍了反向传播算法&#xff08;Backpropagation&#xff09;&#xff0c;这一…...

外贸非洲市场要如何开发

刚不久前中非合作峰会论坛之后&#xff0c;取消了非洲33国的进口关税&#xff0c;中非贸易一直以来都还不错&#xff0c;这次应该会更上一个台阶。今天就来给大家分享一下&#xff0c;关于非洲市场的一些分析和开发方法。 一、非洲市场情况 非洲是一个广阔的大陆&#xff0c;由…...

python去除空格join()

sinput().split() print( .join(s)) input().split()的作用&#xff1a; split()是字符串对象的方法。当对一个字符串调用split()方法时&#xff0c;它会根据指定的分隔符将字符串分割成多个子字符串&#xff0c;并将这些子字符串以列表的形式返回。如果不指定分隔符&#xf…...

git push错误:Out of memory, malloc failed (tried toallocate 947912704 bytes)

目录 一、错误截图 二、解决办法 一、错误截图 因项目文件过大&#xff0c;http.postBuffer设置的内存不够&#xff0c;所以报错。 二、解决办法 打开cmd窗口&#xff0c;执行如下命令即可 git config --global http.postBuffer 1024000000 如图所示 执行完成以后&#…...

web平台搭建-LAMP(CentOS-7)

一. 准备工作 环境要求&#xff1a; 操作系统&#xff1a;CentOS 7.X 64位 网络配置&#xff1a;nmtui字符终端图形管理工具或者直接编辑配置文件 关闭SELinux和firewalld防火墙 防火墙&#xff1a; 临时关闭&#xff1a;systemctl stop firewalld 永久关闭&#xff1a;systemc…...

2024.9.21 Python与C++的面试八股文整理,类与对象,内存规划,默认函数,虚函数,封装继承多态

1.什么是类&#xff0c;什么是面向对象 &#xff08;1&#xff09;类是一种蓝图或者模板&#xff0c;用于定义对象的属性和行为&#xff0c;类通常包括&#xff1a;属性&#xff0c;也就是静态特征&#xff0c;方法&#xff0c;也就是动态特征。属性描述对象的特征&#xff0c…...

2024 vue3入门教程:02 我的第一个vue页面

1.打开src下的App.vue&#xff0c;删除所有的默认代码 2.更换为自己写的代码&#xff0c; 变量msg&#xff1a;可以自定义为其他&#xff08;建议不要使用vue的关键字&#xff09; 我的的第一个vue&#xff1a;可以更换为其他自定义文字 3.运行命令两步走 下载依赖 cnpm i…...

[go] 状态模式

状态模式 允许对象在内部状态改变时改变它的行为&#xff0c;对象看起来好像修改了它的类。 模型说明 上下文 &#xff08;Context&#xff09; 保存了对于一个具体状态对象的引用&#xff0c; 并会将所有与该状态相关的工作委派给它。 上下文通过状态接口与状态对象交互&…...

uniapp沉浸式导航栏+自定义导航栏组件

在 UniApp 中实现沉浸式导航栏并结合自定义导航栏组件 一、沉浸式导航栏设置 在pages.json中配置页面样式 在需要设置沉浸式导航栏的页面的style选项中进行如下配置&#xff1a; {"pages": [{"path": "pages/pageName/pageName","style&qu…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...

uniapp 实现腾讯云IM群文件上传下载功能

UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中&#xff0c;群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS&#xff0c;在uniapp中实现&#xff1a; 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...

保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!

目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...