当前位置: 首页 > news >正文

Java的RocketMQ使用

在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中间件,它们的使用方法有一些相似之处,也有各自的特点。

一、RocketMQ 在 Spring Boot 中的使用

  1. 引入依赖

    • 在项目的pom.xml文件中添加 RocketMQ 的依赖。
    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
    </dependency>
    
  2. 配置 RocketMQ

    • application.propertiesapplication.yml文件中配置 RocketMQ 的相关参数,如 namesrvAddr(NameServer 地址)等。
    rocketmq.name-server=127.0.0.1:9876
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入RocketMQTemplate
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;@Component
    public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
    }
    
  4. 消费者

    • 创建一个消费者类,使用@RocketMQMessageListener注解指定监听的主题和消费组。
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;@Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

二、Kafka 在 Spring Boot 中的使用

  1. 引入依赖

    • pom.xml文件中添加 Kafka 的依赖。
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.12</version>
    </dependency>
    
  2. 配置 Kafka

    • application.propertiesapplication.yml文件中配置 Kafka 的相关参数,如 bootstrapServers(Kafka 服务器地址)等。
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入KafkaTemplate
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;@Component
    public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
    }
    
  4. 消费者

    • 创建一个消费者类,使用@KafkaListener注解指定监听的主题和消费组。
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;@Component
    public class KafkaConsumer {@KafkaListener(topics = "your_topic", groupId = "your_consumer_group")public void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

总的来说,RocketMQ 和 Kafka 在 Spring Boot 中的使用都比较方便,具体选择哪种消息中间件可以根据项目的实际需求来决定。RocketMQ 在一些场景下可能具有高吞吐量、低延迟等优势,而 Kafka 则在大规模分布式系统中被广泛应用,具有高可靠性和可扩展性。

二、如何保证消息队列顺序性

1、发送端保证顺序性

  1. 合理设计业务

    • 确保具有顺序性要求的消息被发送到同一个主题(Topic)的同一个队列(Queue)中。比如,将同一类业务的消息按照特定规则进行分类,使得它们都进入相同的队列。
    • 一个业务场景的消息尽量由一个发送端来发送消息,避免多个发送端发送可能导致的乱序。
  2. 使用同步发送

    • 在发送消息时,使用同步发送方式send(Message msg, long timeout),确保消息成功发送后再进行下一个消息的发送。这样可以避免异步发送可能导致的消息乱序情况。

2、消费端保证顺序性

  1. 单线程消费

    • 消费者在消费消息时,采用单线程的方式进行消费。这样可以确保同一队列中的消息按照发送的顺序被依次处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

    在实际应用中,可以将消费逻辑放在一个单独的方法中,然后在这个方法中进行顺序处理,确保消息的顺序性。

  2. 避免并发处理

    • 确保在消费消息的过程中,不会出现并发处理的情况。比如,不要在消费消息的同时启动其他异步任务或者多线程处理,以免破坏消息的顺序性。

3、设置队列数量

  1. 控制队列数量
    • 如果业务对消息顺序性要求非常严格,可以考虑减少主题下的队列数量。通常情况下,一个主题可以包含多个队列,消息会被随机分发到不同的队列中。如果队列数量较少,那么消息更有可能被发送到同一个队列中,从而更容易保证顺序性。

通过以上方法,可以在一定程度上保证 RocketMQ 消息的顺序性。但需要注意的是,保证消息顺序性可能会牺牲一定的性能和吞吐量,因此需要根据实际业务需求进行权衡和选择。

一、如何确保消息队列的可靠性

1、发送端

  1. 同步发送与确认

    • 使用同步发送方式send(Message msg, long timeout),该方法会等待消息发送成功的确认,确保消息被正确地发送到 Broker。如果发送失败或超时,可以进行重试或其他错误处理操作。
    try {SendResult sendResult = rocketMQTemplate.syncSend(topic, message);System.out.println("Message sent successfully: " + sendResult);
    } catch (Exception e) {System.out.println("Failed to send message: " + e.getMessage());// 进行重试或其他错误处理
    }
    
  2. 事务消息

    • 对于一些需要保证事务一致性的场景,可以使用 RocketMQ 的事务消息机制。发送事务消息分为两个阶段,首先发送半事务消息,然后执行本地事务,根据本地事务的结果决定提交或回滚事务消息。
    @Service
    public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage() {TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null);System.out.println("Transaction message sent: " + result);}
    }
    

2、Broker 端

  1. 持久化存储

    • RocketMQ 支持消息的持久化存储,可以将消息存储在磁盘上,以防止消息丢失。通过配置broker.conf文件中的flushDiskType参数,可以选择同步刷盘或异步刷盘方式。同步刷盘可以保证消息在写入磁盘后才返回成功响应,但会影响性能;异步刷盘可以提高性能,但在系统故障时可能会丢失部分未刷盘的消息。
  2. 高可用部署

    • 部署多主多从的 RocketMQ 集群,当主节点出现故障时,从节点可以自动切换为主节点,保证消息服务的可用性。同时,可以配置主从同步方式,确保消息在主从节点之间的可靠同步。

3、消费端

  1. 消费确认

    • 消费者在成功处理消息后,需要向 Broker 发送消费确认。可以通过设置consumeModeCONSUME_PASSIVELY(被动消费模式),并在处理完消息后手动调用acknowledge()方法进行确认。如果消费失败,可以选择重试或者将消息发送到死信队列进行后续处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {try {// 处理消息System.out.println("Received message: " + message);// 确认消费成功getRocketMQListenerContainer().acknowledge();} catch (Exception e) {System.out.println("Failed to process message: " + e.getMessage());// 可以选择重试或者发送到死信队列}}
    }
    
  2. 重试机制

    • 配置消费者的重试次数和重试时间间隔,当消费失败时,RocketMQ 会自动进行重试。可以在application.propertiesapplication.yml中配置rocketmq.retry.timesrocketmq.retry.interval参数来控制重试策略。

通过以上措施,可以在不同阶段保证 RocketMQ 消息的可靠性,确保消息在生产、存储和消费过程中不会丢失或出现错误。

三、保证消息处理的幂等性
在 RocketMQ 中,可以通过以下几种方式来保证消息处理的幂等性:

1、业务层面设计

  1. 使用唯一标识

    • 在业务中为每条消息生成一个唯一的标识,比如使用业务流水号、订单号等作为消息的唯一标识。在消费消息时,先根据这个唯一标识判断该消息是否已经被处理过。如果已经处理过,则直接忽略该消息。
    • 例如在电商系统中,订单创建的消息可以使用订单号作为唯一标识。消费者在处理消息时,先查询数据库中是否存在该订单号对应的处理记录,如果存在则说明该消息已经被处理过,不再重复处理。
    @Service
    public class OrderProcessingService {@Autowiredprivate JdbcTemplate jdbcTemplate;public void processOrderMessage(String orderId) {boolean isProcessed = isOrderProcessed(orderId);if (isProcessed) {return;}// 处理订单逻辑System.out.println("Processing order: " + orderId);markOrderAsProcessed(orderId);}private boolean isOrderProcessed(String orderId) {int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM processed_orders WHERE order_id =?",Integer.class, orderId);return count > 0;}private void markOrderAsProcessed(String orderId) {jdbcTemplate.update("INSERT INTO processed_orders (order_id) VALUES (?)",orderId);}
    }
    
  2. 利用数据库约束

    • 可以在数据库中使用唯一索引、主键约束等方式来保证业务数据的唯一性。在处理消息时,如果违反了这些约束,则说明该消息已经被处理过,不再重复处理。
    • 比如在用户注册的场景中,可以在数据库的用户表中使用用户名或邮箱作为唯一索引。当消费用户注册的消息时,尝试插入用户数据,如果插入失败(因为违反唯一索引约束),则说明该用户已经注册过,不再重复处理。
    @Service
    public class UserRegistrationService {@Autowiredprivate JdbcTemplate jdbcTemplate;public void registerUser(String username, String password) {try {jdbcTemplate.update("INSERT INTO users (username, password) VALUES (?,?)",username, password);} catch (DataIntegrityViolationException e) {// 处理插入失败的情况,可能是用户已存在System.out.println("User already exists: " + username);}}
    }
    

2、技术层面实现

  1. 分布式锁
    • 可以使用分布式锁来保证同一时间只有一个消费者实例在处理特定的消息。在处理消息之前,先获取分布式锁,如果获取成功则处理消息,处理完成后释放锁。如果获取锁失败,则说明该消息正在被其他实例处理,当前实例可以选择等待或者直接忽略该消息。
    • 可以使用 Redis 或 Zookeeper 等实现分布式锁。以 Redis 为例,可以使用 SETNX 命令来实现分布式锁。
    @Service
    public class MessageProcessingService {@Autowiredprivate StringRedisTemplate redisTemplate;public void processMessage(String messageId) {String lockKey = "message_lock_" + messageId;boolean locked = tryLock(lockKey);if (!locked) {return;}try {boolean isProcessed = isMessageProcessed(messageId);if (isProcessed) {return;}// 处理消息逻辑System.out.println("Processing message: " + messageId);markMessageAsProcessed(messageId);} finally {releaseLock(lockKey);}}private boolean tryLock(String key) {return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30));}private void releaseLock(String key) {redisTemplate.delete(key);}private boolean isMessageProcessed(String messageId) {// 判断消息是否已处理的逻辑return false;}private void markMessageAsProcessed(String messageId) {// 标记消息已处理的逻辑}
    }
    

通过以上方法,可以有效地保证 RocketMQ 消息处理的幂等性,避免因重复消费消息而导致的业务数据不一致问题。

相关文章:

Java的RocketMQ使用

在 Spring Boot 中&#xff0c;RocketMQ 和 Kafka 都是常用的消息中间件&#xff0c;它们的使用方法有一些相似之处&#xff0c;也有各自的特点。 一、RocketMQ 在 Spring Boot 中的使用 引入依赖 在项目的pom.xml文件中添加 RocketMQ 的依赖。 <dependency><groupId…...

中间件之MQ-Kafka

一、引言 Apache Kafka是一个分布式消息队列系统&#xff0c;最初由LinkedIn开发&#xff0c;并于2011年开源。Kafka以其高吞吐量、低延迟和容错能力而著名&#xff0c;广泛应用于日志收集、实时流处理、事件驱动架构等领域。本文将详细介绍Kafka的基本概念、特点、应用场景以…...

[DB] NSM

Database Workloads&#xff08;数据库工作负载&#xff09; 数据库工作负载指的是数据库在执行不同类型任务时所需的资源和计算方式&#xff0c;主要包括以下几种类型&#xff1a; 1. On-Line Transaction Processing (OLTP) 中文&#xff1a;联机事务处理解释&#xff1a;…...

Redis 高可用:从主从到集群的全面解析

目录 一、主从复制 (基础)1. 同步复制a. 全量数据同步b. 增量数据同步c. 可能带来的数据不一致 2. 环形缓冲区a. 动态调整槽位 3. runid4. 主从复制解决单点故障a. 单点故障b. 可用性问题 5. 注意事项a. Replica 主动向 Master 建立连接b. Replica 主动向 Master 拉取数据 二、…...

全能型选手视频播放器VLC 3.0.21 for Windows 64 bits支持Windows、Mac OS等供大家学习参考

全能型选手视频播放器&#xff0c;支持Windows、Mac OS、Linux、Android、iOS等系统&#xff0c;也支持播放几乎所有主流视频格式。 推荐指数&#xff1a; ★★★★★ 优点&#xff1a; ◆、界面干净简洁&#xff0c;播放流畅 ◆、支持打开绝大多数的文件格式&#xff0c;包…...

解决在Vue3中使用monaco-editor创建多个实例的导致页面卡死的问题

最近在项目中使用到了monaco-editor来实现相关的业务功能&#xff0c;按照官方使用方法进行了相关操作&#xff0c;但是在使用的时候&#xff0c;总是会导致创建多个编辑器实例&#xff0c;导致页面卡死的情况&#xff0c;下面来看看怎么处理这种情况吧&#xff0c;先说一下我使…...

【某农业大学计算机网络实验报告】实验二 交换机的自学习算法

实验目的&#xff1a; &#xff08;1&#xff09;理解交换机通过逆向自学习算法建立地址转发表的过程。 &#xff08;2&#xff09;理解交换机转发数据帧的规则。 &#xff08;3&#xff09;理解交换机的工作原理。 实验器材&#xff1a; 一台Windows操作系统的PC机。 实…...

燕山大学23级经济管理学院 10.18 C语言作业

燕山大学23级经济管理学院 10.18 C语言作业 文章目录 燕山大学23级经济管理学院 10.18 C语言作业1C语言的基本数据类型主要包括以下几种&#xff1a;为什么设计数据类型&#xff1f;数据类型与知识体系的对应使用数据类型时需要考虑的因素 21. 逻辑运算符2. 真值表3. 硬件实现4…...

【880线代】线性代数一刷错题整理

第一章 行列式 2024.8.20日 1. 2. 3. 第二章 矩阵 2024.8.23日 1. 2024.8.26日 1. 2. 3. 4. 5. 2024.8.28日 1. 2. 3. 4. 第四章 线性方程组 2024.9.13日 1. 2. 3. 4. 5. 2024.9.14日 1. 第五章 相似矩阵 2024.9.14日 1. 2024.9.15日 1. 2. 3. 4. 5. 6. 7. 2024.9.…...

【C++语言】精妙的哈希算法:原理、实现与优化

解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 哈希算法是计算机科学中的一项基本技术,广泛应用于数据检索、加密、缓存等领域。本文将深入探讨C++中的哈希算法,详细讲解其原理、实现、优化以及在不同应用场景中的使用。通过丰富的代码示例和数学推导,本文旨…...

基于STM32的手势电视机遥控器设计

引言 本项目设计了一个基于STM32的手势电视机遥控器&#xff0c;利用红外线传输和加速度传感器&#xff08;或陀螺仪&#xff09;检测用户的手势动作&#xff0c;用于控制电视的音量、频道切换等操作。通过对手势的实时检测和分类&#xff0c;系统能够识别左右、上下、旋转等手…...

2、图像的特征

一、角点检测-Harris 1、cv2.cornerHarris角点检测函数 在 cv2.cornerHarris 函数中&#xff0c;Sobel 算子用于计算图像的梯度&#xff0c;这是 Harris 角点检测的第一步。 cv2.cornerHarris(src, blockSize, ksize, k, dstNone, borderTypeNone)下面是各个参数的详细解释&…...

URL、URN和URI的区别

目录 一&#xff1a;URI二&#xff1a;URN三&#xff1a;URL1. URL格式 一&#xff1a;URI URI 是&#xff08;Uniform Resource Identifier&#xff09;统一资源标识符的缩写。用于唯一标识互联网上的资源。URI包含了URN和URL 二&#xff1a;URN URN是&#xff08;Uniform …...

深入理解Spring框架几个重要扩展接口

本文介绍Spring框架的几个日常开发重要扩展接口&#xff0c;方便日常项目中按需扩展使用。 一、Processor 系列接口 用途&#xff1a; Processor 系列接口包括 BeanPostProcessor 和 BeanFactoryPostProcessor&#xff0c;它们的设计目的是在 Spring 容器启动过程中对 Bean 和…...

使用dotnet-counters和dotnet-dump 分析.NET Core 项目内存占用问题

在.NET Core 项目部署后&#xff0c;我们往往会遇到内存占用越来越高的问题&#xff0c;但是由于项目部署在Linux上&#xff0c;因此无法使用VS的远程调试工具来排查内存占用问题。那么这篇文章我们大家一起来学习一下如何排查内存占用问题。 首先&#xff0c;我们来看一下应用…...

1282:最大子矩阵

题目&#xff1a; 已知矩阵的大小定义为矩阵中所有元素的和。给定一个矩阵&#xff0c;你的任务是找到最大的非空(大小至少是1 1)子矩阵。 比如&#xff0c;如下4 4的矩阵 0 -2 -7 0 9 2 -6 2 -4 1 -4 1 -1 8 0 -2 的最大子矩阵是 9 2 -4 1 -1 8 这个子矩阵的大小是15。 …...

C++编程语言:抽象机制:特殊运算符(Bjarne Stroustrup)

第19章 特殊运算符(Special Operators) 目录 19.1 引言 19.2 特殊运算符(Special Operators) 19.2.1 下标运算符(Subscripting) 19.2.2 函数调用运算符(Function Call) 19.2.3 解引用(Dereferencing) 19.2.4 递增和递减(Increment and Decrement) 19…...

图片无损放大工具Topaz Gigapixel AI v7.4.4 绿色版

Topaz A.I. Gigapixel是这款功能齐全的图象无损变大运用&#xff0c;应用可将智能机拍摄的图象也可以有着专业相机的高质量大尺寸作用。你可以完美地放大你的小照片并大规模打印&#xff0c;它根本不会粘贴。它具有清晰的效果和完美的品质。 借助AIGigapixel&#xff0c;您可以…...

Vue中计算属性computed—(详解计算属性vs方法Methods,包括案例+代码)

文章目录 计算属性computed3.1 概述3.2 使用3.3 计算属性vs方法Methods3.4 计算属性的完整写法 计算属性computed 3.1 概述 基于现有的数据&#xff0c;计算出来的新属性。 依赖的数据变化&#xff0c;自动重新计算 语法&#xff1a; 声明在 computed 配置项中&#xff0c;…...

Python程序设计 内置函数 日志模块

logging(日志) 日志记录是程序员工具箱中非常有用的工具。它可以帮助您更好地理解程序的流程&#xff0c;并发现您在开发过程中可能没有想到的场景。 日志为开发人员提供了额外的一组眼睛&#xff0c;这些眼睛不断关注应用程序正在经历的流程。它们可以存储信息&#xff0c;例…...

热量表(热能表)完整指南:原理、公式推导、STM32 嵌入式软件全实现

目录 一、热量表工作原理 1. 核心物理原理 2. 系统组成 3. 工作流程 二、热量计算公式&#xff08;国标 / 欧标 EN1434&#xff09;完整推导 1. 基础定义 2. 最终标准热量公式&#xff08;工业直接用&#xff09; 瞬时热量&#xff1a; 累积热量&#xff1a; 3. 公式…...

Ruby OpenAI用户行为分析:AI交互模式深度研究

Ruby OpenAI用户行为分析&#xff1a;AI交互模式深度研究 【免费下载链接】ruby-openai OpenAI API Ruby! &#x1f916;&#x1fa75; Now with Assistants, Threads, Messages, Runs and Text to Speech &#x1f37e; 项目地址: https://gitcode.com/gh_mirrors/ru/ruby-…...

GitLab vs Gitea 深度解析:如何选择适合你的代码托管方案?

1. 核心定位与适用场景对比 第一次接触代码托管平台时&#xff0c;我和很多开发者一样在GitLab和Gitea之间纠结。经过三年在不同规模团队的实际使用&#xff0c;我发现这两个工具就像瑞士军刀和美工刀的关系——没有绝对的好坏&#xff0c;关键看你要切什么。 GitLab更像是个&q…...

2026年程序员必看:AI Agent全面爆发,国产算力突围,这波技术红利别错过

&#x1f525;个人主页&#xff1a;北极的代码&#xff08;欢迎来访&#xff09; &#x1f3ac;作者简介&#xff1a;java后端学习者 ❄️个人专栏&#xff1a;苍穹外卖日记&#xff0c;SSM框架深入&#xff0c;JavaWeb ✨命运的结局尽可永在&#xff0c;不屈的挑战却不可须臾或…...

蓝牙UUID:从标准服务到自定义通信的密钥

1. 蓝牙UUID&#xff1a;智能设备的身份证 想象一下你走进一个满是蓝牙设备的房间——智能手环在测量心率&#xff0c;温湿度计在报告数据&#xff0c;智能灯泡等待你的指令。这些设备如何知道该响应哪个请求&#xff1f;答案就藏在那个128位的UUID&#xff08;通用唯一识别码…...

OpenVoice语音合成技术全解析:从痛点突破到多场景落地实践

OpenVoice语音合成技术全解析&#xff1a;从痛点突破到多场景落地实践 【免费下载链接】OpenVoice 项目是MyShell AI开源的即时语音克隆技术OpenVoice&#xff0c;旨在提供一种能够快速从少量语音样本中准确复制人类声音特征&#xff0c;并实现多种语言及语音风格转换的解决方案…...

原神抽卡数据分析工具:智能解析与可视化全攻略

原神抽卡数据分析工具&#xff1a;智能解析与可视化全攻略 【免费下载链接】genshin-wish-export biuuu/genshin-wish-export - 一个使用Electron制作的原神祈愿记录导出工具&#xff0c;它可以通过读取游戏日志或代理模式获取访问游戏祈愿记录API所需的authKey。 项目地址: …...

RAG实战解析:如何通过检索增强生成提升知识密集型NLP任务性能

1. RAG技术为什么能改变知识密集型NLP任务格局 第一次听说RAG&#xff08;Retrieval-Augmented Generation&#xff09;这个概念时&#xff0c;我正被一个开放域问答项目折磨得焦头烂额。当时我们用纯BART模型生成的答案总是出现事实性错误&#xff0c;比如把"特斯拉创始人…...

GJK碰撞检测算法:从原理到实战的5个核心技巧

GJK碰撞检测算法&#xff1a;从原理到实战的5个核心技巧 【免费下载链接】gjk.c Gilbert-Johnson-Keerthi (GJK) collision detection algorithm in 200 lines of clean plain C 项目地址: https://gitcode.com/gh_mirrors/gj/gjk.c GJK碰撞检测算法是游戏开发和物理引擎…...

什么是 Harness Engineering?把 Prompt、Workflow、Eval 串成系统的那层骨架

点击上方 前端Q&#xff0c;关注公众号回复加群&#xff0c;加入前端Q技术交流群上一篇我们先把问题抛出来了&#xff1a; 为什么现在大家都在聊 Agent、Workflow、AI Coding&#xff0c;可真正决定系统上限的&#xff0c;往往不是模型本身&#xff0c;而是模型外那层工程骨架。…...