当前位置: 首页 > news >正文

RabbitMq 使用说明

1. 声明交换机和队列,以及交换机和队列绑定

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class CeshiQueue {//延迟交换机fanoutpublic static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";//延时队列Apublic static final String DELAY_QUEUE_A_NAME = "delay_queue_a_name";//延时队列Bpublic static final String DELAY_QUEUE_B_NAME = "delay_queue_b_name";//延时队列A路由Keypublic static final String DELAY_ROUTING_KEY_A_NAME = "delay_routing_key_a_name";//延时队列B路由Keypublic static final String DELAY_ROUTING_KEY_B_NAME = "delay_routing_key_b_name";//死信交换机public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange_name";//死信队列Apublic static final String DEAD_LETTER_QUEUE_A_NAME = "dead_letter_queue_a_name";//死信队列Bpublic static final String DEAD_LETTER_QUEUE_B_NAME = "dead_letter_queue_b_name";//死信队列A路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_A_NAME = "dead_letter_routing_key_a_name";//死信队列B路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_B_NAME = "dead_letter_routing_key_b_name";/*** 声明延时交换机 fanout** @return*/@Bean(DELAY_EXCHANGE_NAME)public Exchange DELAY_EXCHANGE_NAME() {// return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.directExchange(DELAY_EXCHANGE_NAME).durable(true).build();}/*** 声明死信交换机 direct** @return*/@Bean(DEAD_LETTER_EXCHANGE_NAME)public Exchange DEAD_LETTER_EXCHANGE_NAME() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME).durable(true).build();}/*** 声明延时队列A* 并绑定死信交换机 和 死信队列A  根据路由key** @return*/@Bean(DELAY_QUEUE_A_NAME)public Queue DELAY_QUEUEA_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_NAME);//设置此队列延时时间 6秒map.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(map).build();}/*** 声明延时队列B* 并绑定死信交换机 和 死信队列B  根据路由key** @return*/@Bean(DELAY_QUEUE_B_NAME)public Queue DELAY_QUEUEB_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_NAME);//设置此队列延时时间 12秒map.put("x-message-ttl", 12000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(map).build();}/*** 声明死信队列A** @return*/@Bean(DEAD_LETTER_QUEUE_A_NAME)public Queue DEAD_LETTER_QUEUEA_NAME() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}/*** 声明死信队列B** @return*/@Bean(DEAD_LETTER_QUEUE_B_NAME)public Queue DEAD_LETTER_QUEUEB_NAME() {return new Queue(DEAD_LETTER_QUEUE_B_NAME);}/*** 延时队列A绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueABinding(@Qualifier(DELAY_QUEUE_A_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_A_NAME).noargs();}/*** 延时队列B绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueBBinding(@Qualifier(DELAY_QUEUE_B_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_B_NAME).noargs();}/*** 死信队列A绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueABinding(@Qualifier(DEAD_LETTER_QUEUE_A_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_NAME).noargs();}/*** 死信队列B绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueBBinding(@Qualifier(DEAD_LETTER_QUEUE_B_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_NAME).noargs();}}

2. 生产者发送消息

import com.core.rabbitmq.queue.CeshiQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/ceshiDelay1")public void ceshi() {System.out.println("provider ================" + new Date());//发送消息Map map = new HashMap();map.put("name", "老六");map.put("age", "18");//第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
//        rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, "", map);rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, CeshiQueue.DELAY_ROUTING_KEY_A_NAME, map
//                ,message -> {
//                    message.getMessageProperties().setExpiration(1000 * 6 + "");
//                    message.getMessageProperties().setDelay(6 * 1000);
//                    return message;
//                }););System.out.println("订单提交成功,请及时付款");}

3. 消费者消费消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;
import java.util.Map;/*** 小程序订单超时未支付关闭订单** @date 2023/03/03*/
@Component
@Slf4j
//@RequiredArgsConstructor
public class ConsumerListener {@RabbitListener(queues = CeshiQueue.DEAD_LETTER_QUEUE_A_NAME)public void process2(Map order, Message message, @Headers Map<String, Object> headers, Channel channel) {log.info("消費者 订单号消息", order);long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("订单 延时队列,消费者 订单号消息【{}】", order);log.info("订单 延时队列,消费者 订单号消息【{}】", headers);log.info("订单 延时队列,消费者 订单号消息【{}】", message);log.info("订单 延时队列,消费者 订单号消息【{}】", channel);log.info("订单 延时队列,消费者 订单号消息【{}】", deliveryTag);System.out.println(new Date() + "消费者 执行结束...." + message);try {      // 如果没有错误,则执行此步,表示成功签收; 手动签收,第一个参数表示消息的deliveryTag,第二个参数表示是否允许多条消息同时被签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// requeue为true则重新入队列,否则丢弃或者进入死信队列  /*** @param deliveryTag 消息序号* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)*/channel.basicReject(deliveryTag, false);log.error("订单  超时支付异常,系统重新关闭订单【{}】", deliveryTag);}}
}

        注意:生产者发送延迟队列里,用延迟路由key,想要得到延迟效果,需要用死信队列来监听,如果用延迟队列监听就得不到延迟效果,延迟队列就会立即受到消息(初学者会遇到的坑)。

4. BasicReject 

        拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。 接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);

        BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。

        BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。

channel.BasicNack(3, true, false);

        在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。

5. RabbitMQ 消息确认机制ACK

ack 机制保证的是broker和消费者之间的可靠性

ack 表示的是消费端收到消息后的确认方式,有三种确认方式

        自动确认:acknowledge="none"(默认)
        手动确认:acknowledge="manual"
        根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,不作讲解)
自动确认的解释:

        当消息一旦被 Consumer 接收到,则自动确认收到,并将相应消息从 RabbitMQ 的消息缓存中移除
手动确认的解释:

        在实际业务处理中,很可能消费端收到消息后业务处理出现异常,那么该消息就会丢失;
        如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() 手动签收;如果出现异常,则调用 channel.basicNack()方法,让 broker 自动重新发送消息给消费端。

rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景_出一个场景问mq的用法(延迟、死信)

rabbitmq-BasicReject

RabbitMQ ACK消息确认机制 快速入门

相关文章:

RabbitMq 使用说明

1. 声明交换机和队列&#xff0c;以及交换机和队列绑定 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.spr…...

Vue(10-20)

1Vue赋值方式 Object.defineProperty <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" conten…...

C++-对四个智能指针:shared_ptr,unique_ptr,weak_ptr,auto_ptr的理解

回答如下&#xff1a; C的智能指针是一种特殊类型的“指针”&#xff0c;其主要目的是自动跟踪内存分配和释放&#xff0c;以避免程序中出现内存泄露或空悬指针等问题&#xff0c;主要采用的技术是&#xff1a;借助于类的生命周期&#xff0c;当超出了类的作用域时&#xff0c…...

uni-app中使用vue3语法详解

全局创建 app.use(createPina()).mount 全局方法 通过app.config.globalProperties.xxx可以创建 这里我们写了一个字符串翻转的全局方法 main.js里面添加一个全局方法 不要忘了加$ 否则会报错 // #ifdef VUE3 //导入创建app import { createSSRApp } from vue //导入创建ap…...

三十四、MongoDB PHP

PHP 语言可是使用 mongo.so ( Windows 下是 mongo.dll ) 扩展访问 MongoDB 数据库 MongoDB PHP 在各平台上的安装及驱动包下载请查看: PHP 安装 MongoDB 扩展驱动 如果你使用的是 PHP7&#xff0c;请移步&#xff1a; PHP7 MongoDB 安装与使用 PHP 连接 MongoDB 和 选择一个…...

浅拷贝和深拷贝的区别

浅拷贝和深拷贝 总结&#xff1a;浅拷贝对象数据共享&#xff0c;深拷贝是一个完全独立的对象&#xff0c;因此对象数据不共享。 浅拷贝&#xff08;Shallow Copy&#xff09; 浅拷贝是指创建一个新的对象&#xff0c;但是该新对象只是原始对象的一个副本。具体而言&#xf…...

6个常用Pycharm插件推荐,老手100%都用过

人生苦短 我用python 有些插件是下载后需要重启Pycharm才生效的 免费领源码、安装包&#xff1a;扣扣qun 903971231 PyCharm 本身已经足够优秀&#xff0c; 就算不使用插件&#xff0c; 也可以吊打市面上 90%的 Python 编辑器。 如果硬要我推荐几款实用的话&#xff0c; 那么…...

TCP的11种状态

CLOSED状态&#xff1a;初始状态&#xff0c;表示TCP连接是“关闭的”或者“未打开的”LISTEN状态&#xff1a;表示服务端的某个端口正处于监听状态&#xff0c;正在等待客户端连接的到来SYN_SENT状态&#xff1a;当客户端发送SYN请求建立连接之后&#xff0c;客户端处于SYN_SE…...

new 指令简单过程 / 类加载简单过程初始化

例子&#xff1a;Person p new Person(“张三”,”23”); 因为new用到person.class,所以先找到person.class文件&#xff0c;并且加载到内存中&#xff08;如果有父类先加载父类&#xff09;执行static块以及static变量的初始化&#xff08;如果有父类先初始化父类&#xff0…...

Asan基本原理及试用

概述 Asan是Google专门为C/C开发的内存错误探测工具&#xff0c;其具有如下功能 使用已释放内存&#xff08;野指针&#xff09;√堆内存越界&#xff08;读写&#xff09;√栈内存越界&#xff08;读写&#xff09;√全局变量越界&#xff08;读写&#xff09;函数返回局部变…...

深度学习应用技巧4-模型融合:投票法、加权平均法、集成模型法

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下&#xff0c;深度学习中的模型融合。它是将多个深度学习模型或其预测结果结合起来&#xff0c;以提高模型整体性能的一种技术。 深度学习中的模型融合技术&#xff0c;也叫做集成学习&#xff0c;是指同时使用多个…...

【并发编程】深入理解Java内存模型及相关面试题

文章目录优秀引用1、引入2、概述3、JMM内存模型的实现3.1、简介3.2、原子性3.3、可见性3.4、有序性4、相关面试题4.1、你知道什么是Java内存模型JMM吗&#xff1f;4.2、JMM和volatile他们两个之间的关系是什么&#xff1f;4.3、JMM有哪些特性/能说说JMM的三大特性吗&#xff1f…...

C++编程语言STL之queue介绍

本文主要介绍C编程语言的STL&#xff08;Standard Template Library&#xff09;中queue&#xff08;队列&#xff09;的相关知识&#xff0c;同时通过示例代码介绍queue的常见用法。1 概述适配器&#xff08;adaptor&#xff09;是STL中的一个通用概念。容器、迭代器和函数都有…...

ACO优化蚁群算法

%% 蚁群算法&#xff08;ant colony optimization,ACO&#xff09; %清空变量 clear close all clc [ graph ] createGraph(); figure subplot(1,3,1) drawGraph( graph); %% 初始化参数 maxIter 100; antNo 50; tau0 10 * 1 / ( graph.n * mean( graph.edges(:) …...

SwiftUI 常用组件和属性(SwiftUI初学笔记)

本文为初学SwiftUI笔记。记录SwiftUI常用的组件和属性。 组件 共有属性(View的属性) Image("toRight").resizable().background(.red) // 背景色.shadow(color: .black, radius: 2, x: 9, y: 15) //阴影.frame(width: 30, height: 30) // 宽高 可以只设置宽或者高.…...

Centos 中设置代理的两种方法

Centos 中设置代理的两种方法 在使用局域网时&#xff0c;有时在局域网内只有一台电脑可以进行上网&#xff0c;其他电脑只能通过配置代理的方式来上网&#xff0c;在Windows系统中设置代理上网相对简单&#xff0c;如果只需上网的话&#xff0c;只需在浏览器中找到网络连接&am…...

高速PCB设计指南系列(一)

第一篇 PCB布线 在PCB设计中&#xff0c;布线是完成产品设计的重要步骤&#xff0c;可以说前面的准备工作都是为它而做的&#xff0c; 在整个PCB中&#xff0c;以布线的设计过程限定最高&#xff0c;技巧最细、工作量最大。PCB布线有单面布线、 双面布线及多层布线。布线的方…...

云端IDE:TitanIDE v2.6.0 正式发布

原文作者&#xff1a;行云创新技术总监 邓冰寒 引言 云原生集成开发环境 TitanIDE v2.6.0 正式发布了&#xff0c;一起来看看都有那些全新的体验吧&#xff01; TitanIDE 是一款云IDE, 也称 CloudIDE&#xff0c;作为数字化时代研发体系不可或缺的一环&#xff0c;和企业建设…...

【Python】tqdm 模块

import mathfrom tqdm import tqdm, trange# 计算阶乘 results_1 []for i in range(6666):results_1.append(math.factorial(i))这是一个循环计算阶乘的程序&#xff0c;我们不知道程序运行的具体情况&#xff0c;如果能加上一个程序运行过程的进度条&#xff0c;那可就太有趣…...

论文阅读:Adversarial Cross-Modal Retrieval对抗式跨模式检索

Adversarial Cross-Modal Retrieval 对抗式跨模式检索 跨模态检索研究的核心是学习一个共同的子空间&#xff0c;不同模态的数据可以直接相互比较。本文提出了一种新的对抗性跨模态检索&#xff08;ACMR&#xff09;方法&#xff0c;它在对抗性学习的基础上寻求有效的共同子空间…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

为什么要创建 Vue 实例

核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...

mac:大模型系列测试

0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何&#xff0c;是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试&#xff0c;是可以跑通文章里面的代码。训练速度也是很快的。 注意…...

《信号与系统》第 6 章 信号与系统的时域和频域特性

目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...

ThreadLocal 源码

ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物&#xff0c;因为每个访问一个线程局部变量的线程&#xff08;通过其 get 或 set 方法&#xff09;都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段&#xff0c;这些类希望将…...

2025.6.9总结(利与弊)

凡事都有两面性。在大厂上班也不例外。今天找开发定位问题&#xff0c;从一个接口人不断溯源到另一个 接口人。有时候&#xff0c;不知道是谁的责任填。将工作内容分的很细&#xff0c;每个人负责其中的一小块。我清楚的意识到&#xff0c;自己就是个可以随时替换的螺丝钉&…...