当前位置: 首页 > news >正文

RabbitMQ: return机制

1. Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

 2.Java的实现方式

1.导入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

2.生产者的实现方式

 采用Return机制来监听消息是否从exchange送到了指定的queue中

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;import java.io.IOException;public class SendRetrun {public static final String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数,可以为null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启 return 机制//编写回调方法channel.addReturnListener(new ReturnListener() {//如果消息没有成功发送到队列,这个方法会被调用@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("====================ReturnListener==================");System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);System.out.println("properties:"+properties);System.out.println("body:"+new String(body,"utf-8"));System.out.println("====================ReturnListener==================");}});String message = "Hello doubleasdasda!";//生产者如何发送消息,使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字* other properties - 消息的其他属性,可以为null* body – 消息的内容,注意,要是有 字节数组*///注意:如果要使用生产者的return机制,需要在发送消息时,指定mandatory(强制性)为truechannel.basicPublish("", "sadnaas", true,null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(1000);//   关闭资源channel.close();conn.close();}
}

这个必须要加上才能让rutern返回机制生效 

 

 3.消费者的实现方式

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private  final  static  String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();/*** 第一个参数队列名称* 第二个参数,耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数,可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中DeliverCallback deliverCallback =new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者,发送的消息的字节数组byte[] body = message.getBody();String msg = new String(body, "utf-8");//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});}}

3.整合springboot实现

1.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.yml配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-returns: true #开启return机制

3.RabbitMQ配置文件

package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);// 开启return机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("message:"+new String(message.getBody()));System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);}});return rabbitTemplate;}}

4.生产者的controller

    @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test1")public String test1(String msg,String routkey){System.out.println(msg);String exchangeName = "";//默认交换机String routingkey = routkey;//队列名字//生产者发送消息rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);return "ok";}

5.消费者写一个队列

   @RabbitListener(queues = "queueA")public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

6.消费者的配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual  # 手动ackprefetch: 1 #等价于basicQos(1)

相关文章:

RabbitMQ: return机制

1. Return机制 Confirm只能保证消息到达exchange&#xff0c;无法保证消息可以被exchange分发到指定queue。 而且exchange是不能持久化消息的&#xff0c;queue是可以持久化消息。 采用Return机制来监听消息是否从exchange送到了指定的queue中 2.Java的实现方式 1.导入依赖 &l…...

记录一些奇怪的报错

错误&#xff1a;AttributeError: module distutils has no attribute version 解决方案&#xff1a; 第一步&#xff1a;pip uninstall setuptools 第二步&#xff1a;conda install setuptools58.0.4 错误&#xff1a;ModuleNotFoundError: No module named _distutils_hac…...

Ubuntu 安装redis数据库,并设置开机自启动

1、下载安装包 wget http://download.redis.io/releases/redis-7.0.9.tar.gz 2、解压 tar -zxvf redis-7.0.9.tar.gz 3、复制到解压缩的包移动到/usr/local/ sudo mv ./redis-7.0.9 /usr/local/ 4、编译 cd /usr/local/redis-7.0.9 sudo make 5、测试: 时间会比较长&#xff0…...

基于开源模型搭建实时人脸识别系统(五):人脸跟踪

继续填坑&#xff0c;之前已经讲了人脸检测&#xff0c;人脸识别实战之基于开源模型搭建实时人脸识别系统&#xff08;二&#xff09;&#xff1a;人脸检测概览与模型选型_开源人脸识别模型_CodingInCV的博客-CSDN博客&#xff0c;人脸检测是定位出画面中人脸的位置&#xff0c…...

VUE | 配置环境变量

本篇目录 1. 创建开发环境配置文件2. 创建正式环境配置文件3. 在代码中访问环境变量4. 加载环境变量 在 Vue 项目中是使用 .env 文件来定义和使用不同的环境变量&#xff0c;这些文件在 Vue 项目根目录下创建。推荐有几种环境就创建几个 .env 文件&#xff0c;下面就开发环境和…...

Dynamic-TP入门初探

背景 在使用线程池的过程中&#xff0c;会出现一些痛点&#xff1a; 代码中创建了一个线程池&#xff0c;但是不知道那几个核心参数设置多少比较合适。凭经验设置参数值&#xff0c;上线后发现需要调整&#xff0c;改代码重新发布服务&#xff0c;非常麻烦。线程池相对开发人…...

Git的基本操作:远程操作

7 Git的远程操作 远程操作主要是指&#xff0c;在不同的仓库之间进行提交和代码更改。是一个明显的对等的分布式系统。其中本地个仓库与远程仓库&#xff0c;不同的远程仓库之间都可以建立这种关系。这种关系之间的操作主要有pull和push。 远程仓库 创建SSH key远程仓库和本…...

【IOC,AOP】spring的基础概念

IOC 控制反转 对象的创建控制权转交给外部实体&#xff0c;就是控制反转。外部实体便是IOC容器。其实就是以前创建java对象都是我们new一下&#xff0c;现在我们可以把这个new交给IOC容器来做&#xff0c;new出来的对象也会交由IOC容器来管理。这个new出来的对象则称为Bean。 …...

安全实战 | 怎么用零信任防范弱密码?

防范弱密码&#xff0c;不仅需要提升安全性&#xff0c;更需要提升用户体验。 比如在登录各类业务系统时&#xff0c;我们希望员工登录不同系统不再频繁切换账号密码&#xff0c;不再需要3-5个月更换一次密码&#xff0c;也不再需要频繁的输入、记录、找回密码。 员工所有的办…...

1-4 AUTOSAR方法论

总目录——AUTOSAR入门详解AUTOSAR入门详解目录汇总&#xff1a;待续中。。。https://xianfan.blog.csdn.net/article/details/132818463 目录 一、前言 二、方法论 三、单个ECU开发流程 一、前言 汽车生产供应链上有以下角色&#xff1a;OEM、TIER1、TIER2&#xff0c;其主…...

MFC C++ 数据结构及相互转化 CString char * char[] byte PCSTR DWORE unsigned

CString&#xff1a; char * char [] BYTE BYTE [] unsigned char DWORD CHAR&#xff1a;单字节字符8bit WCHAR为Unicode字符:typedef unsigned short wchar_t TCHAR : 如果当前编译方式为ANSI(默认)方式&#xff0c;TCHAR等价于CHAR&#xff0c;如果为Unicode方式&#xff0c…...

多版本CUDA安装切换

系统中默认的安装CUDA为12.0&#xff0c;现在需要在个人用户下安装CUDA11.7。 CUDA 下载 CUDA官网下载 安装 Log file not open.Segmentation fault (core dumped)错误 将/tmp/cuda-installer.log删除即可。重新安装&#xff0c;去掉驱动的安装&#xff0c;设置Toolkit的安装…...

sqlserver union和union all 的区别

1.首先在数据库编辑1-40数字&#xff1b; 2.查询Num<30的数据&#xff0c;查询Num>20 and Num<40的数据&#xff0c;使用union all合并&#xff1b; 发现30-20的数字重复了&#xff0c;可见union all 不去重&#xff1b; 3.查询Num<30的数据&#xff0c;查询Num…...

Matlab 如何计算正弦信号的幅值和初始相角

Matlab 如何计算正弦信号的幅值和初始相角 1、概述 如果已知一个正弦信号的幅值&#xff0c;在FFT后频域上该信号谱线的幅值与设置值不同&#xff0c;而是大了许多&#xff1b;如果不知道某一正弦信号的幅値&#xff0c;又如何通FFT后在頻域上求出该正弦信号的幅值呢? 2、…...

华为hcie认证培训报班培训好?还是自学好

华为HCIE认证培训报班培训和自学各有优势。 培训的优势&#xff1a; 系统性学习&#xff1a;培训课程通常会系统地涵盖HCIE认证所需的各个知识点&#xff0c;帮助你建立全面的理论体系。指导与互动&#xff1a;培训中&#xff0c;能够与资深讲师互动&#xff0c;及时解答疑惑…...

ASP.NET+sqlserver通用电子病历管理系统

一、源码描述 这是一款简洁十分美观的ASP.NETsqlserver源码&#xff0c;界面十分美观&#xff0c;功能也比较全面&#xff0c;比较适合 作为毕业设计、课程设计、使用&#xff0c;感兴趣的朋友可以下载看看哦 二、功能介绍 该源码功能十分的全面&#xff0c;具体介绍如下&…...

wireshark通常无法抓取交换机所有端口报文

Wireshark 是一种网络分析工具&#xff0c;它通常在计算机的网络接口上进行数据包捕获和分析。然而&#xff0c;Wireshark 默认情况下无法直接捕获交换机所有端口的报文。 交换机是一种网络设备&#xff0c;它在局域网内转发数据包&#xff0c;根据目的MAC地址将数据包仅发送到…...

猫头虎的技术笔记:Spring Boot启动报错解决方案

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…...

Istio网关流量转发

摘要 Istio网关转发到后端服务的步骤&#xff0c;可以按照以下详细说明进行操作&#xff1a; 配置Istio Sidecar&#xff1a;确保目标后端服务已经部署并成功运行&#xff0c;并为其启用Istio Sidecar。Istio Sidecar负责在Pod中注入Istio代理&#xff0c;以便实现流量控制和…...

阿里云acp云计算认证考试科目有哪些?

阿里云ACP云计算认证考试科目包括以下内容&#xff1a; 阿里云云计算基础知识&#xff1a;包括云计算的定义、特点、服务模式、部署模式、虚拟化技术等相关知识。阿里云产品&#xff1a;包括阿里云ECS、RDS、SLB、OSS、DNS等核心产品的架构、使用方法、优化技巧等相关知识。云…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

Ubuntu Cursor升级成v1.0

0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开&#xff0c;快捷键也不好用&#xff0c;当看到 Cursor 升级后&#xff0c;还是蛮高兴的 1. 下载 Cursor 下载地址&#xff1a;https://www.cursor.com/cn/downloads 点击下载 Linux (x64) &#xff0c;…...