当前位置: 首页 > news >正文

SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

前言

在现代分布式系统中,消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强大的支持,使得与消息队列(如RabbitMQ、Kafka等)的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来配置和管理队列、交换机、消息转换器等组件,从而实现一个高效且可扩展的消息处理架构。

在本博客中,我们将重点介绍:

如何使用Spring的注解方式配置RabbitMQ的队列和交换机。
如何配置消息转换器(如Jackson2JsonMessageConverter)来处理不同格式的消息。
如何根据业务需求对现有代码进行改造,将消息队列引入到系统中,从而实现消息的异步处理与解耦。
通过这篇文章,您将了解如何使用Spring框架的注解配置简化消息队列的管理,同时提升系统的可扩展性和维护性。


基于注解的声明队列交换机

利用SpringAMQP声明DirectExchange并与队列绑定
需求如下:

  1. 在consumer服务中,声明队列direct.queue1和direct.queue2
  2. 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

在这里插入图片描述
基于Bean声明队列和交换机代码如下:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct")}@Beanpublic Queue directQueue1(){return new Queue("direct.queuue1");}@Beanpublic Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queuue2");}@Beanpublic Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

SpringAMOP还提供了基于@RabbitListener注解来声明队列和交换机的方式

@RabbitListener(bindings =@QueueBinding(value = @Queue(name =direct.queue1),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueuel(string msg){System.out.println("消费者1接收到Direct消息:【+msg+"】");
}

接收者代码如下:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String message)throws Exception {log.info("消费者1监听到direct.queue2的消息,["+message+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))

消息转换器

消息转换器
需求:测试利用SpringAMQP发送对象类型的消息

  • 声明一个队列,名为object.queue
  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 在控制台查看消息,总结你能发现的问题
// 准备消息
Map<String,0bject>msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age"21);

创建队列object.queue
在这里插入图片描述
测试代码如下:

	@Testpublic void TestSendObject(){Map<String, Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 18);//3.发送消息 参数分别是:交换机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend("object.queue",msg);}

在控制台上找到object.queue中得到消息
在这里插入图片描述
Spring的对消息对象的处理是由org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置Messageconverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}

在这里插入图片描述
在这里插入图片描述
消费者代码:

	@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg)throws Exception {log.info("消费者监听到pbject.queue的消息,["+msg+"]");}

在这里插入图片描述
运行结果如下:
在这里插入图片描述

业务改造

需求:改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MO通知交易服务更新订单状态。
在这里插入图片描述
在trade-service微服务消费者配置和pay-service微服务发送者都配置MQ依赖

	<!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在trade-service微服务和pay-service微服务添加上RabbitMQ配置信息

spring:rabbitmq:host: 192.168.244.136port: 5672virtual-host: /hmallusername: hmallpassword: 1234

因为消费者和发送者都需要消息转换器,故直接将代码写到hm-common服务中,在config包中创建MqConfig类

package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

同时trade-service微服务和pay-service微服务是无法自动扫描到该类,采用SpringBoot自动装配的原理,在resource文件夹下的META-INF文件夹下的spring.factories文件中添加类路径:
在这里插入图片描述

在接收者trade-service微服务中创建PayStatusListener

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue("trade.pay.success.queue"),exchange = @Exchange(value = "pay.direct"),key = "pay.success"))public void ListenPaySuccess(Long orderId) {orderService.markOrderPaySuccess(orderId);}}

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

@Service
@RequiredArgsConstructor
@Slf4j
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final RabbitTemplate rabbitTemplate;...@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}
}

在这里插入图片描述
在这里插入图片描述


总结

本文介绍了基于Spring框架的注解方式来配置消息队列、交换机以及消息转换器的实现方法。通过注解配置,开发者可以更轻松地创建和管理RabbitMQ等消息队列的组件,而无需过多的 XML 配置或繁琐的手动配置。具体来说,我们探讨了如何:

使用 @RabbitListener 和 @EnableRabbit 注解配置消息监听器和消息队列。
配置消息转换器,特别是如何通过 Jackson2JsonMessageConverter 将消息转换为JSON格式,从而实现数据的序列化与反序列化。
结合业务需求,讲解如何对现有系统进行改造,集成消息队列,实现异步处理和服务解耦。
通过这些配置和改造,系统的消息处理能力得到了增强,性能和可扩展性也得到了显著提升。消息队列的使用不仅能够减少服务之间的紧耦合,还能够通过异步方式提高系统的响应速度和吞吐量。

希望本博客能够帮助您理解Spring在消息队列方面的强大功能,并为您的业务应用提供参考。随着系统复杂度的增加,合理的使用消息队列将成为构建高可用、高性能系统的关键之一。

相关文章:

SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

前言 在现代分布式系统中&#xff0c;消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强大的支持&#xff0c;使得与消息队列&#xff08;如RabbitMQ、Kafka等&#xff09;的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来配置和管理队…...

Opengl常用缓冲对象功能介绍及使用示例(C++实现)

本文整理了常用的opengl缓冲区对象并安排了使用示例 名称英文全称作用简述顶点数组对象Vertex Array Object (VAO)管理 VBO 和 EBO 的配置&#xff0c;存储顶点属性设置&#xff0c;简化渲染流程&#xff0c;避免重复设置状态顶点缓冲区对象Vertex Buffer Object (VBO)存储顶点…...

docker独立部署milvus向量数据库

milvus镜像&#xff1a;国外封锁&#xff0c;国内源也不好用。基本上所有源都不能用 首先想到阿里云服务&#xff0c;但是阿里云国外服务器便宜的300~400呢。 基于成本考虑终于装上心心念念的milvus(*^▽^*) 安装 Milvus 安装 Milvus 独立版 wget https://raw.githubuserco…...

【JT/T 808协议】808 协议开发笔记 ② ( 终端注册 | 终端注册应答 | 字符编码转换网站 )

文章目录 一、消息头 数据1、消息头拼接2、消息 ID 字段3、消息体属性 字段4、终端手机号 字段5、终端流水号 字段 二、消息体 数据三、校验码计算四、最终计算结果五、终端注册应答1、分解终端应答数据2、终端应答 消息体 数据 六、字符编码转换网站 一、消息头 数据 1、消息头…...

github配置sshkey

使用命令生成sshkey ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 依此会要求输入以下信息&#xff0c;可以使用默认值 设置保存密钥的路径 设置SSH密钥密码&#xff08;备注&#xff1a;空内容表示不设置SSH密钥密码&#xff09; 再次确认SSH密钥密…...

Java数据结构第十二期:走进二叉树的奇妙世界(一)

专栏&#xff1a;数据结构(Java版) 个人主页&#xff1a;手握风云 目录 一、树型结构 1.1. 树的定义 1.2. 树的基本概念 1.3. 树的表示形式 二、二叉树 2.1. 概念 2.2. 两种特殊的二叉树 2.3. 二叉树的性质 2.4. 二叉树的存储 三、二叉树的基本操作 一、树型结构 1.…...

Web的增删改查

准备环境 1. 添加web 点击项目右键——>选择**添加框架**选择**web应用程序** 2.创建lib目录 在web应用程序的**WEB-INF目录下**创建lib目录添加jar包(5个)解压&#xff1a;右键——>选择**添加库** 3.创建Dao层 在src目录下创建包com.zmq在该包下创建dao层添加工具…...

Java 前后端时间格式转换

在 Web 开发里&#xff0c;时间格式处理既常见又关键。由于前端和后端对时间的表示、处理方式存在差异&#xff0c;熟练掌握时间格式的转换方法就显得尤为重要。这篇文章会深入探讨 Java 前后端时间格式转换的相关知识&#xff0c;特别是 Java 时间转换的多种方式&#xff0c;其…...

【用deepseek和chatgpt做算法竞赛】——还得DeepSeek来 -Minimum Cost Trees_5

往期 【用deepseek和chatgpt做算法竞赛】——华为算法精英实战营第十九期-Minimum Cost Trees_0&#xff1a;介绍了题目和背景【用deepseek和chatgpt做算法竞赛】——华为算法精英实战营第十九期-Minimum Cost Trees_1&#xff1a;题目输入的格式说明&#xff0c;选择了邻接表…...

C++ 互斥锁的使用

mutex std::mutex 是C标准库中用于线程同步的互斥锁机制&#xff0c;主要用于保护共享资源&#xff0c;避免多个线程同时访问导致的竞态条件。 它提供了以下功能&#xff1a; 加锁&#xff08;lock&#xff09;&#xff1a;阻塞当前线程&#xff0c;直到获取锁。 解锁&#…...

【Elasticsearch】Retrieve inner hits获取嵌套查询的具体的嵌套文档来源,以及父子文档的来源

Retrieve inner hits 是 Elasticsearch 中的一个功能&#xff0c;用于在嵌套查询或父子查询中&#xff0c;返回导致主文档匹配的具体嵌套对象或子/父文档的详细信息&#xff0c;帮助用户更直观地理解查询结果的来源。 在 Elasticsearch 中&#xff0c;Retrieve inner hits是一…...

C语言中的typedef关键字详解

C语言中的typedef关键字详解 在C语言编程中&#xff0c;typedef 关键字是一个非常实用的特性&#xff0c;它可以帮助我们创建新的类型名&#xff0c;从而简化代码&#xff0c;提高可读性。本文将详细解析typedef的使用方法、场景以及注意事项。 1. typedef简介 typedef 是Ty…...

怎麼利用靜態ISP住宅代理在指紋流覽器中管理社媒帳號?

靜態ISP住宅代理是一種基於真實住宅IP的代理服務。這類代理IP通常由互聯網服務提供商&#xff08;ISP&#xff09;分配&#xff0c;具有非常高的真實性&#xff0c;與普通數據中心代理相比&#xff0c;更不容易被平臺檢測到為“虛假IP”或“代理IP”&#xff0c;靜態ISP住宅代理…...

【多语言生态篇一】【DeepSeek×Java:Spring Boot微服务集成全栈指南 】

(手把手带你从零实现AI能力调用,万字长文预警,建议收藏实操) 一、环境准备:别输在起跑线上 1.1 硬件软件全家桶 JDK版本:必须 ≥17(Spring Boot 3.2+强制要求,低版本直接报错)IDE推荐:IntelliJ IDEA终极版(社区版缺Spring AI插件支持)构建工具:Maven 3.9+ / Grad…...

IOS UITextField 无法隐藏键盘问题

设置UITextField 键盘按钮返回键为“完成”&#xff0c;即return key 设置done .m代码设置代理 //设置代理协议 UITextFieldDelegate&#xff0c; self.mobileTextField.delegate self; ///点击完成键隐藏键盘 - (BOOL)textFieldShouldReturn:(UITextField *)textField{//取…...

einops测试

文章目录 1. einops2. code3. pytorch 1. einops einops 主要是通过爱因斯坦标记法来处理张量矩阵的库&#xff0c;让矩阵处理上非常简单。 conda : conda install conda-forge::einopspython: 2. code import torch import torch.nn as nn import torch.nn.functional as…...

25轻化工程研究生复试面试问题汇总 轻化工程专业知识问题很全! 轻化工程复试全流程攻略 轻化工程考研复试真题汇总

轻化工程复试心里没谱&#xff1f;学姐带你玩转面试准备&#xff01; 是不是总觉得老师会问些刁钻问题&#xff1f;别焦虑&#xff01;其实轻化工程复试套路就那些&#xff0c;看完这篇攻略直接掌握复试通关密码&#xff01;文中有重点面试题可直接背~ 目录 一、这些行为赶紧避…...

小米路由器 AX3000T 降级后无法正常使用,解决办法

问题描述 买了个 AX3000T 路由器&#xff0c;想安装 OpenWRT 或者 安装 Clash 使用&#xff0c;看教程说是需要降级到 v1.0.47 版本。 结果刷机之后路由器无法打开了&#xff0c;一直黄灯亮&#xff0c;中间灭一下&#xff0c;又是黄灯长亮&#xff0c;没有 WIFI 没有连接。以…...

qt5实现表盘的旋转效果,通过提升QLabel类

因为工作需要&#xff0c;需要实现温度的表盘展示效果 实现思路&#xff1a; 通过提示声QLabel控价类&#xff0c;实现报盘的旋转和展示效果 1. 编写一个QLabel的类MyQLabel,实现两个方法 1. void paintEvent(QPaintEvent *event); //重绘函数 2. void valueChanged(int va…...

【HeadFirst系列之HeadFirst设计模式】第7天之命令模式:封装请求,轻松实现解耦!

命令模式&#xff1a;封装请求&#xff0c;轻松实现解耦&#xff01; 大家好&#xff01;今天我们来聊聊设计模式中的命令模式&#xff08;Command Pattern&#xff09;。如果你曾经需要将请求封装成对象&#xff0c;或者希望实现请求的撤销、重做等功能&#xff0c;那么命令模…...

Speechless:3分钟学会微博永久备份,告别内容丢失焦虑

Speechless&#xff1a;3分钟学会微博永久备份&#xff0c;告别内容丢失焦虑 【免费下载链接】Speechless 把新浪微博的内容&#xff0c;导出成 PDF 文件进行备份的 Chrome Extension。 项目地址: https://gitcode.com/gh_mirrors/sp/Speechless 想象一下这样的场景&…...

3种常见问题与解决方案:Vue3-Marquee如何为你的项目打造流畅滚动效果

3种常见问题与解决方案&#xff1a;Vue3-Marquee如何为你的项目打造流畅滚动效果 【免费下载链接】vue3-marquee A simple marquee component with ZERO dependencies for Vue 3. 项目地址: https://gitcode.com/gh_mirrors/vu/vue3-marquee 你是否在为Vue 3项目寻找一个…...

【RAGFlow】如何通过API查询知识库内容

import requests import jsondata \{"dataset_ids": ["617892ce3d2111f1835f373a6cab5d12"],"question": "快乐8游戏中&#xff0c;总共有多少个号码&#xff1f;","top_k": 3}# 发送http请求 header {"Content-Type…...

信息学奥赛刷题笔记:我是如何用BFS‘通关’3D地牢迷宫题的

信息学奥赛刷题笔记&#xff1a;我是如何用BFS‘通关’3D地牢迷宫题的 第一次看到"Dungeon Master"这道三维迷宫题时&#xff0c;我的大脑瞬间宕机——二维迷宫还没玩明白&#xff0c;现在居然要处理z轴&#xff1f;但正是这种挑战让我兴奋。作为NOI备考生&#xff0…...

Real-Anime-Z部署案例:Z-Image底座+LoRA融合全流程详解(含safetensors加载)

Real-Anime-Z部署案例&#xff1a;Z-Image底座LoRA融合全流程详解&#xff08;含safetensors加载&#xff09; 1. 项目概述 Real-Anime-Z是一款基于Stable Diffusion技术的写实向动漫风格大模型&#xff0c;采用独特的2.5D风格设计&#xff0c;在保留真实质感的同时强化动漫美…...

从零构建大模型:大模型微调与对齐-SFT/RLHF 技术详解

前言大语言模型从通用预训练走向可用、好用的核心环节&#xff0c;是微调与对齐。预训练阶段让模型掌握语言规律与海量知识&#xff0c;但输出往往无序、不可控、不遵循指令&#xff1b;而以监督指令微调&#xff08;SFT&#xff09; 为起点、以人类反馈强化学习&#xff08;RL…...

从‘调参噩梦’到‘一键收敛’:全局快速Terminal滑模控制参数整定心得分享

从‘调参噩梦’到‘一键收敛’&#xff1a;全局快速Terminal滑模控制参数整定实战指南 滑模控制工程师的日常&#xff0c;往往始于理论推导的兴奋&#xff0c;终于参数调试的崩溃。当你在Simulink里反复拖动α、β、p、q的滑块&#xff0c;看着仿真曲线在发散与抖振之间反复横跳…...

1.6T 光模块的能效革命

合作核心与产品规格合作双方&#xff1a;光子技术提供商 Sivers Semiconductors 工程制造服务商 Jabil。核心产品&#xff1a;1.6T 线性接收光收发模块。关键技术&#xff1a;集成 Sivers 的高性能分布式反馈激光器。目标应用&#xff1a;下一代超大规模 AI 数据中心的光互连。…...

Phi-3.5-Mini-Instruct惊艳效果展示:7GB显存下媲美Qwen2.5的逻辑与代码能力

Phi-3.5-Mini-Instruct惊艳效果展示&#xff1a;7GB显存下媲美Qwen2.5的逻辑与代码能力 1. 开篇亮点 Phi-3.5-Mini-Instruct作为微软最新推出的轻量级大模型&#xff0c;在仅需7GB显存的条件下&#xff0c;展现出令人惊叹的逻辑推理和代码生成能力。这款专为本地运行优化的模…...

SQL如何实现按自定义排序进行分组汇总_ORDERBY与聚合函数

GROUP BY 结果顺序未定义&#xff0c;ORDER BY 仅排序最终结果&#xff1b;需用 CASE WHEN 或 FIELD() 构造有序分组键&#xff0c;再 GROUP BY 该键与原始字段&#xff0c;最后 ORDER BY 控制输出。ORDER BY 不能直接用在 GROUP BY 后做自定义排序分组汇总SQL 标准里&#xff…...