rabbitMq举例
新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!
生产者
代码举例
public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}
/*** 发送消息* @param exchangeName exchangeName* @param routingKey routingKey* @param msg mq message* @return msgId*/public String sendMsg(final String exchangeName,final String routingKey,final String msg) {final CorrelationData correlationDataInfo = new CorrelationData();final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));correlationDataInfo.setId(msgId);//send over callback lograbbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(correlationData!=null) {final String id = correlationData.getId();if (ack) {log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));} else {log.error("消息投递失败,消息Id[{}] [{}]", id, cause);}}else {log.error("消息投递失败,correlationData为null!");}});rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
分析代码
这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:
1. 方法签名
java
public String sendMsg(final String exchangeName, final String routingKey, final String msg)
- 该方法接收三个参数:
exchangeName:消息发送的交换机名称。routingKey:消息的路由键,用来确定消息将被路由到哪个队列。msg:待发送的消息内容。
2. CorrelationData 创建与消息 ID 设置
java
final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);
CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。- 生成
msgId:msgId是通过拼接一个前缀"yl:dps"、交换机名exchangeName、路由键routingKey,以及通过IdUtil.getSnowflake(1, 3).nextId()生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。 setId(msgId):将生成的msgId设置到correlationDataInfo对象中。
3. 设置回调函数
java
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });
setConfirmCallback:通过rabbitTemplate.setConfirmCallback()方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。- 参数说明:
correlationData:携带消息相关信息的对象(这里是correlationDataInfo),包含了消息的 ID 等。ack:布尔值,表示消息是否成功投递(true表示成功,false表示失败)。cause:如果ack为false,cause会包含失败的原因。
- 回调内容:
- 如果
correlationData不为空:- 如果
ack为true,则表示消息投递成功,记录日志。 - 如果
ack为false,则表示消息投递失败,记录失败日志,并包含失败原因。
- 如果
- 如果
correlationData为null,记录错误日志。
- 如果
4. 发送消息
java
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将correlationDataInfo(包含msgId)附带发送,以便在回调中进行确认和记录。rabbitTemplate是RabbitTemplate对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。
5. 返回消息 ID
return msgId;
- 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。
总结:
这段代码的主要作用是:
- 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
- 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
- 调用
rabbitTemplate.convertAndSend()方法向指定的交换机发送消息,同时附带CorrelationData(包含消息 ID)。 - 返回生成的消息 ID,以便后续追踪或处理。
关键点:
- 消息确认机制:使用
ConfirmCallback确保消息是否成功投递,并且通过日志记录投递状态。 - 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
- 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。
消费者
代码举例
@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),key = "${rabbitmq.yl.x.routing.key}"))public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("MQ-收到【x】状态变更消息:{} ", payload);final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));if (NumberUtil.isNumber(objDeliveryTag)) {final long deliveryTag= Long.parseLong(objDeliveryTag);log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);channel.basicAck(deliveryTag,true);}if(CharSequenceUtil.isNotBlank(payload)) {}}
在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:
方法签名解析
public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException
1. public void leaseStateChange
public: 表明这个方法是公开的,可以被外部调用。void: 表明这个方法没有返回值。leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
-
@Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload是一个String类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。-
注解的作用:
@Payload使得 Spring 能够自动将消息的主体部分注入到方法参数payload中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给payload参数。 -
示例: 假设接收到的消息体是
"{"state": "active", "leaseId": "12345"}",payload将会是该字符串。
-
3. @Headers Map<String, Object> headers
-
@Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。-
注解的作用:
@Headers会将消息头部的内容绑定到headers参数,这个参数是一个Map<String, Object>类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。 -
示例: 如果消息头包含如下信息:
{"correlationId": "abc123", "messageType": "leaseUpdate"}那么
headers将会是一个Map,其内容是:{"correlationId": "abc123", "messageType": "leaseUpdate"}
-
4. Channel channel
-
Channel: 这是 RabbitMQ 的核心概念之一。Channel代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。-
作用: 在 Spring AMQP 中,
Channel通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用Channel来手动确认消息,或者控制消息是否成功消费。 -
示例: 如果在消息处理过程中出现异常,消费者可能需要通过
channel.basicNack()方法来拒绝该消息并可能重新入队。
-
5. throws IOException
throws IOException: 表明这个方法可能会抛出IOException异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。
Spring AMQP 消费者代码示例
假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Map;@Component
public class LeaseStateChangeListener {// 监听指定队列的消息@RabbitListener(queues = "leaseStateQueue")public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {try {// 处理消息体System.out.println("Received message: " + payload);// 获取消息头部信息String correlationId = (String) headers.get("correlationId");String messageType = (String) headers.get("messageType");System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);// 模拟处理业务逻辑processLeaseStateChange(payload);// 确认消息已成功消费channel.basicAck(headers.hashCode(), false); // 手动确认消息} catch (Exception e) {// 异常处理,拒绝消息并重新入队System.err.println("Error processing message: " + e.getMessage());channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队}}private void processLeaseStateChange(String payload) {// 假设这里是处理租赁状态更新的业务逻辑// 比如将消息解析为对象,进行租赁状态更新System.out.println("Processing lease state change for payload: " + payload);}
}
解析
-
@RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听
leaseStateQueue队列。当有消息到达这个队列时,这个方法会被调用。 -
消息体 (
payload): 这个方法会接收到一个消息体,@Payload注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数payload上。 -
消息头 (
headers): 使用@Headers注解将消息的头部信息绑定到headers参数上,Map<String, Object>类型。你可以从中获取如correlationId和messageType等附加信息。 -
Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用
channel.basicAck()来确认消息,表示该消息已经被成功消费。如果处理失败,调用channel.basicNack()拒绝该消息,并可以选择是否重新入队。
总结
- 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
- 通过
@Payload获取消息体内容,使用@Headers获取消息头信息。 - 使用
Channel来确认消息的处理状态。 - 使用
@RabbitListener注解自动监听队列,并处理消费的消息。
这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。
消息怎么知道发给哪一个队列
先看队列与交换机怎么绑定的
先创建队列,然后绑定到交换机
RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书

相关文章:
rabbitMq举例
新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服! 生产者 代码举例 public String sendMsg(final String exchangeName,final String routingKey,final String msg) {} /*** 发送消息* param exchangeName exchangeName* param routin…...
奇怪的知识又增加了:ESP32下的Lisp编程=>ULisp--Lisp for microcontrollers
ESP32下有MicroPython,那么我就在想,有Lisp语言支持吗?答案是果然有!有ULisp,专门为MCU设计的Lisp! 网址:uLisp - Lisp for microcontrollers 介绍:用于微控制器的 Lisp 适用于 Ar…...
渗透测试之信息收集
免责声明:使用本教程或工具,用户必须遵守所有适用的法律和法规,并且用户应自行承担所有风险和责任。 文章目录 1. 基础信息收集2. 网络资产发现3. 网站和应用信息4. 技术栈识别5. 安全漏洞和配置6. 移动应用分析7.Google语法常见Google使用场…...
基本分页存储管理
一、实验目的 目的:熟悉并掌握基本分页存储管理的思想及其实现方法,熟悉并掌握基本分页存储管理的分配和回收方式。 任务:模拟实现基本分页存储管理方式下内存空间的分配和回收。 二、实验内容 1、实验内容 内存空间的初始化——可以由用户输…...
SQLServer到MySQL的数据高效迁移方案分享
SQL Server数据集成到MySQL的技术案例分享 在企业级数据管理中,跨平台的数据集成是一个常见且关键的任务。本次我们将探讨如何通过轻易云数据集成平台,将巨益OMS系统中的退款单明细表从SQL Server高效、安全地迁移到MySQL数据库中。具体方案名称为“7--…...
软考:工作后再考的性价比分析
引言 在当今的就业市场中,软考(软件设计师、系统分析师等资格考试)是否值得在校学生花费时间和精力去准备?本文将从多个角度深入分析软考在不同阶段的性价比,帮助大家做出明智的选择。 一、软考的价值与局限性 1.1 …...
shell编程(完结)
shell编程(完结) 声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章 笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其…...
UNIX数据恢复—UNIX系统常见故障问题和数据恢复方案
UNIX系统常见故障表现: 1、存储结构出错; 2、数据删除; 3、文件系统格式化; 4、其他原因数据丢失。 UNIX系统常见故障解决方案: 1、检测UNIX系统故障涉及的设备是否存在硬件故障,如果存在硬件故障…...
adb连接逍遥安卓模拟器失败的问题解决方案
1、逍遥安卓模拟器进入系统应用,设置-关于平板电脑-版本号,连续点击3次以上,直到提示进入开发者模式,返回设置界面,进入【开发者选项】-【USB调试】开启,之后重启模拟器再次adb尝试连接。 2、android stud…...
【昇腾】NPU ID:物理ID、逻辑ID、芯片映射关系
起因: https://www.hiascend.com/document/detail/zh/Atlas%20200I%20A2/23.0.0/re/npu/npusmi_013.html npu-smi info -l查询所有NPU设备: [naienotebook-npu-bd130045-55bbffd786-lr6t8 DCNN]$ npu-smi info -lTotal Count : 1NPU…...
Three.js曲线篇 8.管道漫游
目录 创建样条曲线 创建管道 透视相机漫游 完整代码 大家不要被这个“管道漫游”这几个字所蒙骗了,学完后大家就知道这个知识点有多脏了。我也是误入歧途,好奇了一下“管道漫游”。好了,现在就给大家展示一下为啥这个只是点脏了。 我也废话…...
scala基础_数据类型概览
Scala 数据类型 下表列出了 Scala 支持的数据类型: 类型类别数据类型描述Scala标准库中的实际类基本类型Byte8位有符号整数,数值范围为 -128 到 127scala.Byte基本类型Short16位有符号整数,数值范围为 -32768 到 32767scala.Short基本类型I…...
【LeetCode刷题之路】622.设计循环队列
LeetCode刷题记录 🌐 我的博客主页:iiiiiankor🎯 如果你觉得我的内容对你有帮助,不妨点个赞👍、留个评论✍,或者收藏⭐,让我们一起进步!📝 专栏系列:LeetCode…...
暂停一下,给Next.js项目配置一下ESLint(Next+tailwind项目)
前提 之前开自己的GitHub项目,想着不是团队项目,偷懒没有配置eslint,后面发现还是不行。eslint的存在可以帮助我们规范代码格式,同时 ctrl s保存立即调整代码格式是真的很爽。 除此之外,团队使用eslint也是好处颇多…...
Windows系统磁盘与分区之详解(Detailed Explanation of Windows System Disks and Partitions)
Windows系统磁盘与分区知识详解 在日常使用Windows操作系统的过程中,我们常常会接触到磁盘管理,磁盘分区等操作.然而,许多人可能并不完全理解磁盘和分区的运作原理以及如何高效管理它们. 本篇文章将探讨Windows系统中关于磁盘和分区的各种知识,帮助大家更好地理解磁盘以及分区…...
顺序表的使用,对数据的增删改查
主函数: 3.c #include "3.h"//头文件调用 SqlListptr sql_cerate()//创建顺序表函数 {SqlListptr ptr(SqlListptr)malloc(sizeof(SqlList));//在堆区申请连续的空间if(NULLptr){printf("创建失败\n");return NULL;//如果没有申请成功ÿ…...
XDMA与FPGA:高效数据传输的艺术
XDMA与FPGA:高效数据传输的艺术 引言 在现代计算系统中,数据传输的效率直接影响系统的整体性能。特别是在涉及到高速数据处理的领域,如高性能计算(HPC)、实时视频处理和大数据分析等,如何高效地在主机与F…...
#思科模拟器通过服务配置保障无线网络安全Radius
演示拓扑图: 搭建拓扑时要注意: 只能连接它的Ethernet接口,不然会不通 MAC地址绑定 要求 :通过配置MAC地址过滤禁止非内部员工连接WiFi 打开无线路由器GUI界面,点开下图页面,配置路由器无线网络MAC地址过…...
浅谈Python库之pillow
一、pillow的介绍 Pillow是Python Imaging Library (PIL) 的一个分支,它是一个强大的图像处理库,用于打开、操作和保存许多不同图像文件格式。Pillow提供了广泛的文件格式支持、强大的图像处理能力和广泛的文件格式兼容性。它是PIL的一个友好的分支&…...
Android通过okhttp下载文件(本文案例 下载mp4到本地,并更新到相册)
使用步骤分为两步 第一步导入 okhttp3 依赖 第二步调用本文提供的 utils 第一步这里不做说明了,直接提供第二步复制即用 DownloadUtil 中 download 为下载文件 参数说明 这里主要看你把 destFileName 下载文件名称定义为什么后缀,比如我定义为 .mp4 下…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
