使用消息队列、rocketMq实现通信
1背景
springboot框架,
2需求
后端需要调用一个类似于api这种作用的小工具,获得工具的返回值,后端再根据客户端的返回值进行更新数据操作
3讨论
1工具开发者使用的是python,将工具封装起来,暴露成web接口供后端调用
2方式一能满足需求了,但是考虑到后续这样的工具还有很多,方便工具的管理和集成,考虑使用消息队列的方式进行通信-》使用rocketmq
3既然使用rocketmq,那就要考虑同步还是异步处理消息
比较一下 RocketMQ 中的同步发送和异步发送
同步发送(Synchronous Send):
使用 rocketMQTemplate.syncSend
异步发送(Asynchronous Send):
使用 rocketMQTemplate.asyncSend
主要区别:
(1)执行流程:
同步:调用线程会阻塞,直到收到服务器的响应或超时。
异步:调用后立即返回,不等待服务器响应。消息发送结果通过回调函数通知。
(2)响应时间:
同步:总体响应时间较长,包含网络传输和服务器处理时间。
异步:立即返回,响应时间很短。
(3)可靠性保证:
同步:可以立即知道消息是否发送成功。
异步:无法立即知道发送结果,需要在回调中处理。
(4)异常处理:
同步:可以直接在调用代码中进行异常处理。
异步:异常需要在回调函数中处理。
(5)资源利用:
同步:可能造成线程阻塞,影响系统吞吐量。
异步:更有效地利用系统资源,特别是在高并发场景下。
(6)使用场景:
同步:适用于对可靠性要求高,且可以接受一定延迟的场景。
异步:适用于高吞吐量、对延迟敏感的场景。
如果你需要立即知道消息是否成功发送,并且可以接受一些延迟,使用同步发送。
如果你需要高吞吐量,并且可以在回调中处理发送结果,使用异步发送。
4理解
public class MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendAsync(String topic, String message) {System.out.println("1. 开始异步发送");rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("4. 异步发送成功回调: " + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {System.out.println("4. 异步发送异常回调: " + throwable.getMessage());}});System.out.println("2. 异步发送方法调用完成");}public void demonstrateSendMethods() {String topic = "TestTopic";String message = "Hello, RocketMQ!";System.out.println("开始演示异步发送");sendAsync(topic, message);System.out.println("3. 主方法继续执行");// 为了等待异步操作完成,这里添加一个简单的延迟try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}
}
输出为:
开始演示异步发送
1. 开始异步发送
2. 异步发送方法调用完成在这里插入代码片
3. 主方法继续执行
4. 异步发送成功回调: [消息ID]
方法调用立即返回(步骤 1 和 2)
主线程继续执行(步骤 3)
回调方法(onSuccess 或 onException)在稍后异步执行(步骤 4)
需要注意的是,步骤 4 的执行时间是不确定的。它可能在步骤 3 之前或之后发生,取决于网络条件和服务器响应速度。在实际应用中,你可能需要使用更复杂的机制(如 CompletableFuture 或消息队列)来处理异步结果。
于是引入消息队列
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class AsyncMessageHandler {private final RocketMQTemplate rocketMQTemplate;private final BlockingQueue<AsyncResult> resultQueue;private final Thread processThread;private volatile boolean running = true;public AsyncMessageHandler(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;this.resultQueue = new LinkedBlockingQueue<>();this.processThread = new Thread(this::processResults);this.processThread.start();}public void sendAsyncMessage(String topic, String message) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {resultQueue.offer(new AsyncResult(true, sendResult, null));}@Overridepublic void onException(Throwable throwable) {resultQueue.offer(new AsyncResult(false, null, throwable));}});}private void processResults() {while (running) {try {AsyncResult result = resultQueue.poll(1, TimeUnit.SECONDS);if (result != null) {if (result.success) {System.out.println("Message sent successfully: " + result.sendResult.getMsgId());// 处理成功发送的消息} else {System.out.println("Message send failed: " + result.throwable.getMessage());// 处理发送失败的消息,可能进行重试或记录日志}}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void shutdown() {running = false;processThread.interrupt();}private static class AsyncResult {final boolean success;final SendResult sendResult;final Throwable throwable;AsyncResult(boolean success, SendResult sendResult, Throwable throwable) {this.success = success;this.sendResult = sendResult;this.throwable = throwable;}}
}
解析一下:
processResults 方法的执行机制:
1线程启动时机:
processResults 方法在一个单独的线程中运行。这个线程在 AsyncMessageHandler 构造函数中被创建和启动:
javaCopythis.processThread = new Thread(this::processResults);
this.processThread.start();
2持续执行:
processResults 方法包含一个 while 循环,只要 running 变量为 true,它就会持续执行。
执行条件:
在每次循环中,它尝试从队列中获取一个 AsyncResult 对象:
AsyncResult result = resultQueue.poll(1, TimeUnit.SECONDS);
3具体执行情况:
a. 当队列中有结果时:
如果队列中有 AsyncResult 对象,poll 方法会立即返回这个对象。
然后 processResults 会处理这个结果(打印消息ID或错误信息)。
b. 当队列为空时:
poll 方法会等待最多1秒钟。
如果1秒内没有新的结果加入队列,poll 返回 null。
循环继续,再次尝试获取结果。
c. 当线程被中断时:
如果在等待过程中线程被中断(比如调用 shutdown 方法时),会抛出 InterruptedException。
捕获这个异常后,线程会退出循环并结束。
4执行频率:
如果队列中持续有数据,processResults 会不断执行,处理每个结果。
如果队列经常为空,processResults 大部分时间会在等待新的结果。
5停止执行:
当调用 shutdown 方法时:
javaCopypublic void shutdown() {
running = false;
processThread.interrupt();
}
running 被设置为 false,使得 while 循环条件不再满足。
线程被中断,确保即使在 poll 等待中也能及时退出。
总结:
processResults 方法在一个独立的线程中持续运行。
它会不断尝试从队列中获取并处理结果。
当有结果时立即处理,没有结果时等待。
这个方法会一直执行,直到 AsyncMessageHandler 被关闭。
这种设计确保了:
异步结果可以被及时处理,不会阻塞消息发送。
系统资源得到有效利用,因为在没有结果时线程会等待而不是持续空转。
结果处理的顺序性得到了保证。
结合应用场景:
后端发送异步消息到rocketmq,客户端作为消费者消费消息,并将结果放到一个消息队列中,后端会循环执行获取队列中的消息,根据从队列中获取到的消息进行更新数据库。
// pom.xml 依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>// application.properties
spring.application.name=rocketmq-demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-producer-group// Message.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String id;private String content;
}// ResultMessage.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ResultMessage {private String id;private String result;
}// AsyncMessageHandler.java
@Component
@Slf4j
public class AsyncMessageHandler {private final BlockingQueue<ResultMessage> resultQueue = new LinkedBlockingQueue<>();private final Thread processThread;private volatile boolean running = true;@Autowiredprivate BackendService backendService;public AsyncMessageHandler() {this.processThread = new Thread(this::processResults);this.processThread.start();}public void handleResult(ResultMessage resultMessage) {resultQueue.offer(resultMessage);}private void processResults() {while (running) {try {ResultMessage result = resultQueue.poll(1, TimeUnit.SECONDS);if (result != null) {backendService.updateData(result);}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}@PreDestroypublic void shutdown() {running = false;processThread.interrupt();}
}// BackendService.java
@Service
@Slf4j
public class BackendService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate AsyncMessageHandler asyncMessageHandler;public void sendAsyncMessage(Message message) {rocketMQTemplate.asyncSend("TestTopic", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("Message sent successfully, msgId: {}", sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {log.error("Failed to send message", throwable);}});}public void updateData(ResultMessage resultMessage) {// 这里实现更新数据的逻辑log.info("Updating data with result: {}", resultMessage);// 例如:更新数据库}
}// ClientService.java
@Service
@Slf4j
public class ClientService {@Autowiredprivate AsyncMessageHandler asyncMessageHandler;@RocketMQMessageListener(topic = "TestTopic",consumerGroup = "my-consumer-group")@Componentpublic class MessageConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {log.info("Received message: {}", message);// 处理消息的逻辑String result = processMessage(message);ResultMessage resultMessage = new ResultMessage(message.getId(), result);asyncMessageHandler.handleResult(resultMessage);}}private String processMessage(Message message) {// 这里实现处理消息的逻辑return "Processed: " + message.getContent();}
}// BackendController.java
@RestController
@RequestMapping("/api")
public class BackendController {@Autowiredprivate BackendService backendService;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody Message message) {backendService.sendAsyncMessage(message);return ResponseEntity.ok("Message sent asynchronously");}
}// Application.java
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
这个代码的实现是基于后端与消费者都是用java实现的
进一步处理
被调用的工具不是用的java,而是使用python,就需要使用HTTP接口来实现客户端处理结果的回调。上述代码就需要进行修改,就相当于上述的 AsyncMessageHandler.java和ClientService.java需要在python实现,即客户端消费生产者的消息扔到消息队列中并主动调用后端根据消息更新数据库的操作,那么java这边就需要新增一个接口供python那边调用
java
// pom.xml 依赖 (保持不变)// application.properties
spring.application.name=rocketmq-demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-producer-group// Message.java (保持不变)// ResultMessage.java (保持不变)// BackendService.java
@Service
@Slf4j
public class BackendService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendAsyncMessage(Message message) {rocketMQTemplate.asyncSend("TestTopic", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("Message sent successfully, msgId: {}", sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {log.error("Failed to send message", throwable);}});}public void updateData(ResultMessage resultMessage) {// 这里实现更新数据的逻辑log.info("Updating data with result: {}", resultMessage);// 例如:更新数据库}
}// BackendController.java
@RestController
@RequestMapping("/api")
public class BackendController {@Autowiredprivate BackendService backendService;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestBody Message message) {backendService.sendAsyncMessage(message);return ResponseEntity.ok("Message sent asynchronously");}@PostMapping("/result")public ResponseEntity<String> receiveResult(@RequestBody ResultMessage resultMessage) {backendService.updateData(resultMessage);return ResponseEntity.ok("Result received and processed");}
}// Application.java (保持不变)
python
import json
from rocketmq.client import PushConsumer
import requestsdef callback(msg):print(f"Received message: {msg.body}")# 解析消息message = json.loads(msg.body)# 处理消息result = process_message(message)# 创建结果消息result_message = {"id": message["id"],"result": result}# 发送结果给Java后端try:response = requests.post("http://localhost:8080/api/result", json=result_message)if response.status_code == 200:print("Result sent successfully")else:print(f"Failed to send result. Status code: {response.status_code}")except requests.RequestException as e:print(f"Error sending result: {e}")def process_message(message):# 这里实现处理消息的逻辑return f"Processed: {message['content']}"if __name__ == "__main__":consumer = PushConsumer("my-consumer-group")consumer.set_name_server_address("localhost:9876")consumer.subscribe("TestTopic", callback)print("Consumer started. Press Ctrl+C to exit.")consumer.start()try:consumer.join()except KeyboardInterrupt:consumer.shutdown()
相关文章:
使用消息队列、rocketMq实现通信
1背景 springboot框架, 2需求 后端需要调用一个类似于api这种作用的小工具,获得工具的返回值,后端再根据客户端的返回值进行更新数据操作 3讨论 1工具开发者使用的是python,将工具封装起来,暴露成web接口供后端调用 2方式一能…...

通过LLM大模型将「白雪公主的故事」转为图数据存储
💡 本次将使用LLM大模型将「白雪公主的故事」转为图数据存储于neo4j数据库中,并展示图数据部分的效果 故事内容 很久很久以前,在一个遥远的王国里,有一位美丽的**王后**生下了一个皮肤像雪一样白皙、嘴唇像血一样鲜红的**女儿**…...

MyBatisPlus 第一天
数据库创建表 CREATE DATABASE mybatis_plus /*!40100 DEFAULT CHARACTER SET utf8mb4 */; use mybatis_plus; CREATE TABLE user ( id bigint(20) NOT NULL COMMENT 主键ID, name varchar(30) DEFAULT NULL COMMENT 姓名 , age int(11) DEFAULT NULL COMMENT 年龄 , email va…...

线程与多线程(二)
线程与多线程(二) 一、线程互斥1、相关概念 二、互斥锁1、介绍2、使用场景3、初始化(1)函数(2)概念 4、销毁(1)函数(2)概念 5、加锁(1)…...

算法板子:欧拉函数——求一个数的欧拉函数、线性时间内求1~n所有数的欧拉函数
目录 1. 欧拉函数 (1)概念 (2)性质 (3)计算公式 2. 求一个数的欧拉函数 (1)模拟过程 (2)代码 3. 线性时间内求1~n所有数的欧拉函数——筛法求欧拉函…...
2024牛客暑期多校训练营8
文章目录 A. Haitang and GameE.Haitang and MathJ. Haitang and TriangleK. Haitang and Ava A. Haitang and Game 通过审题可以知道,最后的胜者和若干次操作后最多能增加的数的奇偶有关。 由于 a i a_i ai 较小,所以我们枚举每一个没出现过的 x …...
git的一些操作指令
一、git 提交规范 commit message subject : 空格 message 主体 feat: 新功能(feature)用于提交新功能。fix: 修复 bug用于提交 bug 修复。docs: 文档变更用于提交仅文档相关的修改。style: 代码风格变动(不影响代码逻辑&…...

【IT行业研究报告】Internet Technology
一、引言 随着信息技术的飞速发展,IT行业已成为全球经济的重要驱动力。从云计算、大数据、人工智能到物联网,IT技术正深刻改变着各行各业的生产方式、商业模式和人们的生活方式。本报告旨在深入分析IT行业的现状、发展趋势和挑战,探讨其在各…...

GLM大模型的机器翻译能力测试
背景介绍 最近想对GLM-4今年发布的几个大模型 glm-4-0520,glm-4-air以及glm-4-flash简单评测一下它们的机器翻译能力,由于这几个大模型的容量和训练数据都有区别,所以它们的翻译能力也是不同的。我们这里就分别选择一些有趣的,有…...
【硬件产品经理】汽车A样设计
目录 简介 制造方式 作者简介 简介 一般被称作原型样件(Prototype)。 主要是根据系统需求设计,实现基本功能和关键尺寸,用于基本功能的验证,用于初期产品软件调试和Hil台架测试(Hardware in Loop,硬件在环)的样机阶段。 也就说在设计初期,A样的主要目的可以划分…...
Ubuntu22.04系统中安装机器人操作系统ROS
在Ubuntu 22.04上安装ROS(Robot Operating System)的过程可以分为几个主要步骤。请注意,ROS有不同的版本(如ROS 1的Melodic、Noetic等,以及ROS 2的Foxy、Humble等),这些版本对Ubuntu的支持程度可…...

LeetCode54题:螺旋矩阵(原创)
【题目描述】 给你一个 m 行 n 列的矩阵 matrix ,请按照 顺时针螺旋顺序 ,返回矩阵中的所有元素。 示例 1: 输入:matrix [[1,2,3],[4,5,6],[7,8,9]] 输出:[1,2,3,6,9,8,7,4,5]示例 2: 输入:mat…...

FPGA常见型号
FPGA(现场可编程门阵列)开发板种类繁多,涵盖了从入门级教育用途到高性能工业应用的广泛领域。以下是一些常见的 FPGA 开发板型号及其特点: 1. Xilinx(赛灵思)系列 Xilinx 是 FPGA 领域的领导者之一&#…...

【多模态大模型】FlashAttention in NeurIPS 2022
一、引言 论文: FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness 作者: Stanford University 代码: FlashAttention 特点: 该方法提出将Q、K、V拆分为若干小块,使执行注意力时不需要频…...
过滤器doFilter 方法
在Java EE中,过滤器的放行是指在过滤器的 doFilter 方法中调用 FilterChain 对象的 doFilter 方法,将请求传递给下一个过滤器或目标 servlet 进行处理。这个过程可以理解为过滤器的责任链传递。 过滤器的 doFilter 方法 在过滤器中,实现 Fil…...

WPF篇(9)-CheckBox复选框+RadioButton单选框+RepeatButton重复按钮
CheckBox复选框 CheckBox继承于ToggleButton,而ToggleButton继承于ButtonBase基类。 案例 前端代码 <StackPanel Orientation"Horizontal" HorizontalAlignment"Center" VerticalAlignment"Center"><TextBlock Text"…...

【机器学习基础】线性回归
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科,通过算法和模型让计算机从数据中学习,进行模型训练和优化,做出预测、分类和决策支持。Python成为机器学习的首选语言,…...

java基础概念12-二维数组
一、二维数组的定义 二维数组可以被视为数组的数组,即每个元素都是一个数组。 二维数组的应用场景: 当我们需要把数据分组管理的时候,就需要用到二维数组。 二、二维数组的初始化 2-1、静态初始化 阿里巴巴规范手册: // 静态初始…...

56 锐键交换机开局
锐键交换机开局 一 锐键视图切换 1 Ruijie> 用户视图 2 Ruijie# 特权模式 3 Ruijie(config)# 全局配置模式 4 Ruijie(config-if-GigabitEthernet 1/1/1)# 接口配置模式 5 Ruijie(config)#show vlan 6 exit (退出) 7 enable(进入)...
VR虚拟展厅与传统实体展厅相比,有哪些优势?
视创云展虚拟展厅相比传统的实体展厅具有多方面的优势,主要体现在以下几个方面: 1、降低成本: 虚拟展厅无需租赁或建设物理空间,减少了场地、装修和维护等方面的开支。同时,参观者和参展商无需现场参观或布展&#x…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

android RelativeLayout布局
<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

如何应对敏捷转型中的团队阻力
应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中,明确沟通敏捷转型目的尤为关键,团队成员只有清晰理解转型背后的原因和利益,才能降低对变化的…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁
赛门铁克威胁猎手团队最新报告披露,数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据,严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能,但SEMR…...
【HarmonyOS 5】鸿蒙中Stage模型与FA模型详解
一、前言 在HarmonyOS 5的应用开发模型中,featureAbility是旧版FA模型(Feature Ability)的用法,Stage模型已采用全新的应用架构,推荐使用组件化的上下文获取方式,而非依赖featureAbility。 FA大概是API7之…...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...
2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案
一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...

大模型——基于Docker+DeepSeek+Dify :搭建企业级本地私有化知识库超详细教程
基于Docker+DeepSeek+Dify :搭建企业级本地私有化知识库超详细教程 下载安装Docker Docker官网:https://www.docker.com/ 自定义Docker安装路径 Docker默认安装在C盘,大小大概2.9G,做这行最忌讳的就是安装软件全装C盘,所以我调整了下安装路径。 新建安装目录:E:\MyS…...