当前位置: 首页 > news >正文

Springboot 整合MQ实现延时队列入门

延时队列

  • 添加依赖
  • 配置文件
  • 队列TTL
    • 代码架构图
    • 交换机、队列、绑定配置文件代码
    • 生产者代码
    • 消费者代码
    • 延时队列优化
    • 添加普通队列配置代码
    • 生产者发送消息是进行设置消息的ttl
  • 通过MQ 插件实现延时队列
    • 代码架构图
    • 配置交换机
    • 生产者代码
    • 消费者代码
    • 测试发送

添加依赖

 <!-- rabbitMQ 集成 spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

配置文件

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

队列TTL

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
在这里插入图片描述

交换机、队列、绑定配置文件代码

package com.wlj.rabbitmq.sbmq.confing;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 MQ配置*/
@Configuration
public class TtlQueueConfig {//X交换机public static final String X_EXCHANGE = "X";//QA队列public static final String QUEUE_A = "QA";//QB队列public static final String QUEUE_B = "QB";//Y死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//QD死信队列public static final String DEAD_LETTER_QUEUE = "QD";//声明x交换机@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明y交换机@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明QA队列 ttl为10秒 并绑定对应的死信交换机@Bean("queueA")public Queue queueA(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}//声明QB队列 ttl为40秒 并绑定对应的死信交换机@Bean("queueB")public Queue queueB(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//设置队列QA 绑定交换机X@Beanpublic Binding  queueaBindingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//设置队列QB 绑定交换机X@Beanpublic Binding  queuebBindingX(@Qualifier("queueB")Queue queueB,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//声明死信队列QD@Bean("queueD")public  Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//死信队列和死信交换机绑定@Beanpublic Binding queuedBind(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return  BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码

package com.wlj.rabbitmq.sbmq.confing.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 生产者controller*/
@Slf4j
@RestController
@RequestMapping("ttl")
public class MsgController {@ResourceRabbitTemplate rabbitTemplate;@GetMapping("/send/{msg}")public void sendMsg(@PathVariable String msg){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), msg);rabbitTemplate.convertAndSend("X","XA","发送的消息,延时10秒: "+msg);rabbitTemplate.convertAndSend("X","XB","发送的消息,延时40秒: "+msg);}
}

消费者代码

package com.wlj.rabbitmq.sbmq.confing.dead;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 消费者*/
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void getMsg(Message msg, Channel channel){System.out.println(new String(msg.getBody()));log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻
以上代码声明队列的时候,设置队列的延时时间是10秒和40秒,意味着所有进入队列的消息都是根据队列的延时时间的。这就会有一个问题,如果说业务需要延时20秒、15秒、一分钟、等等等等,难道都需要创建每一种延时队列吗?那岂不是要增加无数个队列才能满足需求。下面就进行优化延时队列

延时队列优化

代码架构图
在这里插入图片描述
声明一个普通的队列,只需要在生产消息的时候设置消息的延时时间即可。

添加普通队列配置代码

  //声明普通队列QC代码@Bean("queueC")public  Queue queueC(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");return  QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//设置QC队列和X交换机绑定@Beanpublic  Binding queuecBindX(@Qualifier("queueC")Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者发送消息是进行设置消息的ttl

@GetMapping("/send/{msg}/{ttl}")public void sendMsgTtl(@PathVariable String msg,@PathVariable String ttl){rabbitTemplate.convertAndSend("X","XC",msg,correlationData->{correlationData.getMessageProperties().setExpiration(ttl);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttl, msg);}

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻/20000
http://localhost:8080/ttl/send/哈哈哈/2000

在这里插入图片描述
消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

通过MQ 插件实现延时队列

Windows 安装MQ延时插件,请查看
Linux 安装MQ延时插件,请查看

代码架构图

延时队列是交换机进行把控消息的ttl。ttl到期才会发送到对应到队列
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
在这里插入图片描述

配置交换机

package com.wlj.rabbitmq.sbmq.confing.plugins;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 基于插件实现延时消息发送*/
@Component
public class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingkeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();}/***   声明交换机*   因为交换机的类型没有延时类型 所以使用自定义交换机*/@Beanpublic CustomExchange delayedExchange(){HashMap<String, Object> args = new HashMap<>();//自定义交换机的类型args.put("x-delayed-type", "direct");// 对应参数: 交换机的名称 x-delayed-message说明是延时消息交换机 是否序列化 是否自动删除,参数return  new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);}//进行绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchangedelayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者代码

 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);}

消费者代码

  public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);}

测试发送

http://localhost:8080/ttl/sendDelayMsg/qwer/20000
http://localhost:8080/ttl/sendDelayMsg/121212/2000

相关文章:

Springboot 整合MQ实现延时队列入门

延时队列 添加依赖配置文件队列TTL代码架构图交换机、队列、绑定配置文件代码生产者代码消费者代码延时队列优化添加普通队列配置代码生产者发送消息是进行设置消息的ttl 通过MQ 插件实现延时队列代码架构图配置交换机生产者代码消费者代码测试发送 添加依赖 <!-- rabbitMQ …...

前端基础(Vue框架)

前言&#xff1a;前端开发框架——Vue框架学习。 准备工作&#xff1a;添加Vue devtools扩展工具 具体可查看下面的这篇博客 添加vue devtools扩展工具添加后F12不显示Vue图标_MRJJ_9的博客-CSDN博客 Vue官方学习文档 Vue.js - 渐进式 JavaScript 框架 | Vue.js 目录 MV…...

【实用插件】ArcGIS for AutoCAD插件分享下载

ArcGIS包含一系列功能&#xff0c;其中ArcGIS for AutoCAD一个免费的可下载的AutoCAD插件&#xff0c;它可简化将CAD和GIS数据整合在一起的过程提供互操作性。 ArcGIS for AutoCAD互操作性平台将连接AutoCAD和 ArcGIS&#xff0c;以增强使用地理环境设计CAD工程图时的用户体验…...

GaussDB数据库SQL系列-子查询

目录 一、前言 二、GaussDB SQL子查询表达式 1、EXISTS/NOT EXISTS 2、IN/NOT IN 3、ANY/SOME 4、ALL 三、GaussDB SQL子查询实验示例 1、创建实验表 2、EXISTS/NOT EXISTS示例 3、IN/NOT IN 示例 4、ANY/SOME 示例 5、ALL示例 四、注意事项及建议 五、小结 一、…...

Kafka 什么速度那么快

批量发送消息 Kafka 采用了批量发送消息的方式&#xff0c;通过将多条消息按照分区进行分组&#xff0c;然后每次发送一个消息集合&#xff0c;看似很平常的一个手段&#xff0c;其实它大大提升了 Kafka 的吞吐量。 消息压缩 消息压缩的目的是为了进一步减少网络传输带宽。而…...

环形链表笔记(自用)

环形链表 不管怎么样slow最多走半圈了&#xff0c; 快慢指针slow走一步&#xff0c;fast走两步最合适&#xff0c;因为假设fast和slow相差n每一次他们前进&#xff0c;就会相差n-1步&#xff0c;这样他们一定会相遇&#xff0c;如果是环形链表的话。 代码 /*** Definition for…...

js循环中发起请求数据不一致问题

项目场景&#xff1a; 在公司的一个项目中需要使用循环更改查询条件&#xff0c;然后查询子表数据&#xff0c;但是在查询过程中for下面的key变化了之后&#xff0c;查询中的key却并没有变化&#xff0c;导致查询的参数不一致&#xff0c;从未结果数据出错 for(let i 0;i<…...

工作流自动化:提升效率、节约成本的重要工具

在现代社会中&#xff0c;软件和技术的运用使得我们的日常活动变得更加简单和高效。然而&#xff0c;这些技术也有自身的特点和独特之处。尽管我们使用这些工具来简化工作&#xff0c;但有时仍需要一些人工干预&#xff0c;比如手动数据录入。在工作场所中&#xff0c;手动数据…...

仿牛客论坛项目day7|Kafka

一、阻塞队列 创建了一个生产者线程和一个消费者线程。生产者线程向队列中放入元素&#xff0c;消费者线程从队列中取出元素。我们可以看到&#xff0c;当队列为空时&#xff0c;消费者线程会被阻塞&#xff0c;直到生产者线程向队列中放入新的元素。 二、Kafka入门 发布、订阅…...

[SpringCloud] 组件性能优化技巧

Feign 配置优化hystrix配置 优化ribbon 优化Servlet 容器 优化Zuul配置 优化 文章目录 1.Servlet 容器 优化2.Feign 配置优化3.Zuul配置 优化4.hystrix配置 优化5.ribbon 优化 1.Servlet 容器 优化 默认情况下, Spring Boot 使用 Tomcat 来作为内嵌的 Servlet 容器, 可以将 We…...

okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp

okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp 1、引入Maven1.1、okhttp发起请求官网Demo 2、下载文件3、扩充&#xff0c;读写 txt文件内容3.1读写内容 示例 http客户端 用的是 okhttp&#xff0c;也可以用 UrlConnetcion或者apache …...

Oracle/PL/SQL奇技淫巧之Json转表

在Oracle中&#xff0c;有些时候我们需要在一个json文档中查数据 这个时候我们可以通过JSON_TABLE函数来把 json文档 提取成一张可以执行正常查询操作的表 先看JSON_TABLE函数的基础用法&#xff1a; JSON_TABLE(json_data, $.json_path COLUMNS (column_definitions))其中&a…...

每日一学——网络安全

网络安全设计、原则、审计等知识点的精讲如下&#xff1a; 网络安全设计与原则&#xff1a; 网络安全设计是指在系统或网络的设计过程中考虑到安全性&#xff0c;并采取相应的安全措施来保护系统或网络不受威胁。安全设计原则包括最小权限原则&#xff08;Least Privilege Prin…...

python中的lstm:介绍和基本使用方法

python中的lstm&#xff1a;介绍和基本使用方法 未使用插件 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种循环神经网络&#xff08;RNN&#xff09;的变体&#xff0c;专门用于处理序列数据。LSTM 可以记忆序列中的长期依赖关系&#xff0c;这使得它非常适合于各…...

【Flink】Flink窗口触发器

数据进入到窗口的时候,窗口是否触发后续的计算由窗口触发器决定,每种类型的窗口都有对应的窗口触发机制。WindowAssigner 默认的 Trigger通常可解决大多数的情况。我们通常使用方式如下,调用trigger()方法把我们想执行触发器传递进去: SingleOutputStreamOperator<Produ…...

深度云化时代,什么样的云网络才是企业的“心头好”?

科技云报道原创。 近年来企业上云的快速推进&#xff0c;对云网络提出了更多需求。 最初&#xff0c;云网络只是满足互联网业务公网接入。 随着移动互联网的发展&#xff0c;企业对云上网络安全隔离能力和互访能力、企业数据中心与云上网络互联、构建混合云的能力&#xff0…...

【快应用】快应用广告学习之激励视频广告

【关键词】 快应用、激励视频广告、广告接入 【介绍】 一、关于激励视频广告 定义&#xff1a;用户通过观看完整的视频广告&#xff0c;获得应用内相关的奖励。适用场景&#xff1a;游戏/快游戏的通关、继续机会、道具获取、积分等场景中&#xff0c;阅读、影音等应用的权益体系…...

国产化系统中遇到的视频花屏、卡顿以及延迟问题的记录与总结

目录 1、国产化系统概述 1.1、国产化操作系统与国产化CPU 1.2、国产化服务器操作系统 1.3、当前国产化系统的主流配置 2、视频解码花屏与卡顿问题 2.1、视频解码花屏 2.2、视频解码卡顿 2.3、关于I帧和P帧的说明 3、国产显卡处理速度慢导致图像卡顿问题 3.1、视频延…...

go内存管理机制

golang内存管理基本是参考tcmalloc来进行的。go内存管理本质上是一个内存池&#xff0c;只不过内部做了很多优化&#xff1a;自动伸缩内存池大小&#xff0c;合理切割内存块。 基本概念&#xff1a; Page&#xff1a;页&#xff0c;一块 8 K大小的内存空间。Go向操作系统申请和…...

【Python】Web学习笔记_flask(5)——会话cookie对象

HTTP是无状态协议&#xff0c;一次请求响应结束后&#xff0c;服务器不会留下对方信息&#xff0c;对于大部分web程序来说&#xff0c;是不方便的&#xff0c;所以有了cookie技术&#xff0c;通过在请求和响应保温中添加cookie数据来保存客户端的状态。 html代码&#xff1a; …...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

免费PDF转图片工具

免费PDF转图片工具 一款简单易用的PDF转图片工具&#xff0c;可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件&#xff0c;也不需要在线上传文件&#xff0c;保护您的隐私。 工具截图 主要特点 &#x1f680; 快速转换&#xff1a;本地转换&#xff0c;无需等待上…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

uniapp 小程序 学习(一)

利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 &#xff1a;开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置&#xff0c;将微信开发者工具放入到Hbuilder中&#xff0c; 打开后出现 如下 bug 解…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...