RabbitMq 使用说明
1. 声明交换机和队列,以及交换机和队列绑定
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class CeshiQueue {//延迟交换机fanoutpublic static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";//延时队列Apublic static final String DELAY_QUEUE_A_NAME = "delay_queue_a_name";//延时队列Bpublic static final String DELAY_QUEUE_B_NAME = "delay_queue_b_name";//延时队列A路由Keypublic static final String DELAY_ROUTING_KEY_A_NAME = "delay_routing_key_a_name";//延时队列B路由Keypublic static final String DELAY_ROUTING_KEY_B_NAME = "delay_routing_key_b_name";//死信交换机public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange_name";//死信队列Apublic static final String DEAD_LETTER_QUEUE_A_NAME = "dead_letter_queue_a_name";//死信队列Bpublic static final String DEAD_LETTER_QUEUE_B_NAME = "dead_letter_queue_b_name";//死信队列A路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_A_NAME = "dead_letter_routing_key_a_name";//死信队列B路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_B_NAME = "dead_letter_routing_key_b_name";/*** 声明延时交换机 fanout** @return*/@Bean(DELAY_EXCHANGE_NAME)public Exchange DELAY_EXCHANGE_NAME() {// return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.directExchange(DELAY_EXCHANGE_NAME).durable(true).build();}/*** 声明死信交换机 direct** @return*/@Bean(DEAD_LETTER_EXCHANGE_NAME)public Exchange DEAD_LETTER_EXCHANGE_NAME() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME).durable(true).build();}/*** 声明延时队列A* 并绑定死信交换机 和 死信队列A 根据路由key** @return*/@Bean(DELAY_QUEUE_A_NAME)public Queue DELAY_QUEUEA_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_NAME);//设置此队列延时时间 6秒map.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(map).build();}/*** 声明延时队列B* 并绑定死信交换机 和 死信队列B 根据路由key** @return*/@Bean(DELAY_QUEUE_B_NAME)public Queue DELAY_QUEUEB_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_NAME);//设置此队列延时时间 12秒map.put("x-message-ttl", 12000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(map).build();}/*** 声明死信队列A** @return*/@Bean(DEAD_LETTER_QUEUE_A_NAME)public Queue DEAD_LETTER_QUEUEA_NAME() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}/*** 声明死信队列B** @return*/@Bean(DEAD_LETTER_QUEUE_B_NAME)public Queue DEAD_LETTER_QUEUEB_NAME() {return new Queue(DEAD_LETTER_QUEUE_B_NAME);}/*** 延时队列A绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueABinding(@Qualifier(DELAY_QUEUE_A_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_A_NAME).noargs();}/*** 延时队列B绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueBBinding(@Qualifier(DELAY_QUEUE_B_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_B_NAME).noargs();}/*** 死信队列A绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueABinding(@Qualifier(DEAD_LETTER_QUEUE_A_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_NAME).noargs();}/*** 死信队列B绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueBBinding(@Qualifier(DEAD_LETTER_QUEUE_B_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_NAME).noargs();}}
2. 生产者发送消息
import com.core.rabbitmq.queue.CeshiQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/ceshiDelay1")public void ceshi() {System.out.println("provider ================" + new Date());//发送消息Map map = new HashMap();map.put("name", "老六");map.put("age", "18");//第一个参数发送给哪个交换机 第二个路由key 我们延时交换机是fanout所以路由key为空 第三个发送对象
// rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, "", map);rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, CeshiQueue.DELAY_ROUTING_KEY_A_NAME, map
// ,message -> {
// message.getMessageProperties().setExpiration(1000 * 6 + "");
// message.getMessageProperties().setDelay(6 * 1000);
// return message;
// }););System.out.println("订单提交成功,请及时付款");}
3. 消费者消费消息
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;
import java.util.Map;/*** 小程序订单超时未支付关闭订单** @date 2023/03/03*/
@Component
@Slf4j
//@RequiredArgsConstructor
public class ConsumerListener {@RabbitListener(queues = CeshiQueue.DEAD_LETTER_QUEUE_A_NAME)public void process2(Map order, Message message, @Headers Map<String, Object> headers, Channel channel) {log.info("消費者 订单号消息", order);long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("订单 延时队列,消费者 订单号消息【{}】", order);log.info("订单 延时队列,消费者 订单号消息【{}】", headers);log.info("订单 延时队列,消费者 订单号消息【{}】", message);log.info("订单 延时队列,消费者 订单号消息【{}】", channel);log.info("订单 延时队列,消费者 订单号消息【{}】", deliveryTag);System.out.println(new Date() + "消费者 执行结束...." + message);try { // 如果没有错误,则执行此步,表示成功签收; 手动签收,第一个参数表示消息的deliveryTag,第二个参数表示是否允许多条消息同时被签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// requeue为true则重新入队列,否则丢弃或者进入死信队列 /*** @param deliveryTag 消息序号* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)*/channel.basicReject(deliveryTag, false);log.error("订单 超时支付异常,系统重新关闭订单【{}】", deliveryTag);}}
}
注意:生产者发送延迟队列里,用延迟路由key,想要得到延迟效果,需要用死信队列来监听,如果用延迟队列监听就得不到延迟效果,延迟队列就会立即受到消息(初学者会遇到的坑)。
4. BasicReject
拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。 接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。
BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。
channel.BasicNack(3, true, false);
在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。
5. RabbitMQ 消息确认机制ACK
ack 机制保证的是broker和消费者之间的可靠性
ack 表示的是消费端收到消息后的确认方式,有三种确认方式
自动确认:acknowledge="none"(默认)
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,不作讲解)
自动确认的解释:
当消息一旦被 Consumer 接收到,则自动确认收到,并将相应消息从 RabbitMQ 的消息缓存中移除
手动确认的解释:
在实际业务处理中,很可能消费端收到消息后业务处理出现异常,那么该消息就会丢失;
如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() 手动签收;如果出现异常,则调用 channel.basicNack()方法,让 broker 自动重新发送消息给消费端。
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景_出一个场景问mq的用法(延迟、死信)
rabbitmq-BasicReject
RabbitMQ ACK消息确认机制 快速入门
相关文章:
RabbitMq 使用说明
1. 声明交换机和队列,以及交换机和队列绑定 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.spr…...

Vue(10-20)
1Vue赋值方式 Object.defineProperty <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" conten…...
C++-对四个智能指针:shared_ptr,unique_ptr,weak_ptr,auto_ptr的理解
回答如下: C的智能指针是一种特殊类型的“指针”,其主要目的是自动跟踪内存分配和释放,以避免程序中出现内存泄露或空悬指针等问题,主要采用的技术是:借助于类的生命周期,当超出了类的作用域时,…...

uni-app中使用vue3语法详解
全局创建 app.use(createPina()).mount 全局方法 通过app.config.globalProperties.xxx可以创建 这里我们写了一个字符串翻转的全局方法 main.js里面添加一个全局方法 不要忘了加$ 否则会报错 // #ifdef VUE3 //导入创建app import { createSSRApp } from vue //导入创建ap…...
三十四、MongoDB PHP
PHP 语言可是使用 mongo.so ( Windows 下是 mongo.dll ) 扩展访问 MongoDB 数据库 MongoDB PHP 在各平台上的安装及驱动包下载请查看: PHP 安装 MongoDB 扩展驱动 如果你使用的是 PHP7,请移步: PHP7 MongoDB 安装与使用 PHP 连接 MongoDB 和 选择一个…...
浅拷贝和深拷贝的区别
浅拷贝和深拷贝 总结:浅拷贝对象数据共享,深拷贝是一个完全独立的对象,因此对象数据不共享。 浅拷贝(Shallow Copy) 浅拷贝是指创建一个新的对象,但是该新对象只是原始对象的一个副本。具体而言…...

6个常用Pycharm插件推荐,老手100%都用过
人生苦短 我用python 有些插件是下载后需要重启Pycharm才生效的 免费领源码、安装包:扣扣qun 903971231 PyCharm 本身已经足够优秀, 就算不使用插件, 也可以吊打市面上 90%的 Python 编辑器。 如果硬要我推荐几款实用的话, 那么…...

TCP的11种状态
CLOSED状态:初始状态,表示TCP连接是“关闭的”或者“未打开的”LISTEN状态:表示服务端的某个端口正处于监听状态,正在等待客户端连接的到来SYN_SENT状态:当客户端发送SYN请求建立连接之后,客户端处于SYN_SE…...
new 指令简单过程 / 类加载简单过程初始化
例子:Person p new Person(“张三”,”23”); 因为new用到person.class,所以先找到person.class文件,并且加载到内存中(如果有父类先加载父类)执行static块以及static变量的初始化(如果有父类先初始化父类࿰…...
Asan基本原理及试用
概述 Asan是Google专门为C/C开发的内存错误探测工具,其具有如下功能 使用已释放内存(野指针)√堆内存越界(读写)√栈内存越界(读写)√全局变量越界(读写)函数返回局部变…...

深度学习应用技巧4-模型融合:投票法、加权平均法、集成模型法
大家好,我是微学AI,今天给大家介绍一下,深度学习中的模型融合。它是将多个深度学习模型或其预测结果结合起来,以提高模型整体性能的一种技术。 深度学习中的模型融合技术,也叫做集成学习,是指同时使用多个…...

【并发编程】深入理解Java内存模型及相关面试题
文章目录优秀引用1、引入2、概述3、JMM内存模型的实现3.1、简介3.2、原子性3.3、可见性3.4、有序性4、相关面试题4.1、你知道什么是Java内存模型JMM吗?4.2、JMM和volatile他们两个之间的关系是什么?4.3、JMM有哪些特性/能说说JMM的三大特性吗?…...
C++编程语言STL之queue介绍
本文主要介绍C编程语言的STL(Standard Template Library)中queue(队列)的相关知识,同时通过示例代码介绍queue的常见用法。1 概述适配器(adaptor)是STL中的一个通用概念。容器、迭代器和函数都有…...
ACO优化蚁群算法
%% 蚁群算法(ant colony optimization,ACO) %清空变量 clear close all clc [ graph ] createGraph(); figure subplot(1,3,1) drawGraph( graph); %% 初始化参数 maxIter 100; antNo 50; tau0 10 * 1 / ( graph.n * mean( graph.edges(:) …...

SwiftUI 常用组件和属性(SwiftUI初学笔记)
本文为初学SwiftUI笔记。记录SwiftUI常用的组件和属性。 组件 共有属性(View的属性) Image("toRight").resizable().background(.red) // 背景色.shadow(color: .black, radius: 2, x: 9, y: 15) //阴影.frame(width: 30, height: 30) // 宽高 可以只设置宽或者高.…...
Centos 中设置代理的两种方法
Centos 中设置代理的两种方法 在使用局域网时,有时在局域网内只有一台电脑可以进行上网,其他电脑只能通过配置代理的方式来上网,在Windows系统中设置代理上网相对简单,如果只需上网的话,只需在浏览器中找到网络连接&am…...
高速PCB设计指南系列(一)
第一篇 PCB布线 在PCB设计中,布线是完成产品设计的重要步骤,可以说前面的准备工作都是为它而做的, 在整个PCB中,以布线的设计过程限定最高,技巧最细、工作量最大。PCB布线有单面布线、 双面布线及多层布线。布线的方…...

云端IDE:TitanIDE v2.6.0 正式发布
原文作者:行云创新技术总监 邓冰寒 引言 云原生集成开发环境 TitanIDE v2.6.0 正式发布了,一起来看看都有那些全新的体验吧! TitanIDE 是一款云IDE, 也称 CloudIDE,作为数字化时代研发体系不可或缺的一环,和企业建设…...

【Python】tqdm 模块
import mathfrom tqdm import tqdm, trange# 计算阶乘 results_1 []for i in range(6666):results_1.append(math.factorial(i))这是一个循环计算阶乘的程序,我们不知道程序运行的具体情况,如果能加上一个程序运行过程的进度条,那可就太有趣…...

论文阅读:Adversarial Cross-Modal Retrieval对抗式跨模式检索
Adversarial Cross-Modal Retrieval 对抗式跨模式检索 跨模态检索研究的核心是学习一个共同的子空间,不同模态的数据可以直接相互比较。本文提出了一种新的对抗性跨模态检索(ACMR)方法,它在对抗性学习的基础上寻求有效的共同子空间…...

Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...
为什么要创建 Vue 实例
核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...

mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...

《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...
2025.6.9总结(利与弊)
凡事都有两面性。在大厂上班也不例外。今天找开发定位问题,从一个接口人不断溯源到另一个 接口人。有时候,不知道是谁的责任填。将工作内容分的很细,每个人负责其中的一小块。我清楚的意识到,自己就是个可以随时替换的螺丝钉&…...