Springboot 整合MQ实现延时队列入门
延时队列
- 添加依赖
- 配置文件
- 队列TTL
- 代码架构图
- 交换机、队列、绑定配置文件代码
- 生产者代码
- 消费者代码
- 延时队列优化
- 添加普通队列配置代码
- 生产者发送消息是进行设置消息的ttl
- 通过MQ 插件实现延时队列
- 代码架构图
- 配置交换机
- 生产者代码
- 消费者代码
- 测试发送
添加依赖
<!-- rabbitMQ 集成 spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
队列TTL
代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

交换机、队列、绑定配置文件代码
package com.wlj.rabbitmq.sbmq.confing;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 MQ配置*/
@Configuration
public class TtlQueueConfig {//X交换机public static final String X_EXCHANGE = "X";//QA队列public static final String QUEUE_A = "QA";//QB队列public static final String QUEUE_B = "QB";//Y死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//QD死信队列public static final String DEAD_LETTER_QUEUE = "QD";//声明x交换机@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明y交换机@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明QA队列 ttl为10秒 并绑定对应的死信交换机@Bean("queueA")public Queue queueA(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}//声明QB队列 ttl为40秒 并绑定对应的死信交换机@Bean("queueB")public Queue queueB(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//设置队列QA 绑定交换机X@Beanpublic Binding queueaBindingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//设置队列QB 绑定交换机X@Beanpublic Binding queuebBindingX(@Qualifier("queueB")Queue queueB,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//死信队列和死信交换机绑定@Beanpublic Binding queuedBind(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者代码
package com.wlj.rabbitmq.sbmq.confing.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 生产者controller*/
@Slf4j
@RestController
@RequestMapping("ttl")
public class MsgController {@ResourceRabbitTemplate rabbitTemplate;@GetMapping("/send/{msg}")public void sendMsg(@PathVariable String msg){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), msg);rabbitTemplate.convertAndSend("X","XA","发送的消息,延时10秒: "+msg);rabbitTemplate.convertAndSend("X","XB","发送的消息,延时40秒: "+msg);}
}
消费者代码
package com.wlj.rabbitmq.sbmq.confing.dead;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 消费者*/
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void getMsg(Message msg, Channel channel){System.out.println(new String(msg.getBody()));log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}
发送测试
http://localhost:8080/ttl/send/嘻嘻嘻
以上代码声明队列的时候,设置队列的延时时间是10秒和40秒,意味着所有进入队列的消息都是根据队列的延时时间的。这就会有一个问题,如果说业务需要延时20秒、15秒、一分钟、等等等等,难道都需要创建每一种延时队列吗?那岂不是要增加无数个队列才能满足需求。下面就进行优化延时队列
延时队列优化
代码架构图

声明一个普通的队列,只需要在生产消息的时候设置消息的延时时间即可。
添加普通队列配置代码
//声明普通队列QC代码@Bean("queueC")public Queue queueC(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//设置QC队列和X交换机绑定@Beanpublic Binding queuecBindX(@Qualifier("queueC")Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
生产者发送消息是进行设置消息的ttl
@GetMapping("/send/{msg}/{ttl}")public void sendMsgTtl(@PathVariable String msg,@PathVariable String ttl){rabbitTemplate.convertAndSend("X","XC",msg,correlationData->{correlationData.getMessageProperties().setExpiration(ttl);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttl, msg);}
发送测试
http://localhost:8080/ttl/send/嘻嘻嘻/20000
http://localhost:8080/ttl/send/哈哈哈/2000

消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
通过MQ 插件实现延时队列
Windows 安装MQ延时插件,请查看
Linux 安装MQ延时插件,请查看
代码架构图
延时队列是交换机进行把控消息的ttl。ttl到期才会发送到对应到队列
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下

配置交换机
package com.wlj.rabbitmq.sbmq.confing.plugins;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 基于插件实现延时消息发送*/
@Component
public class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingkeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();}/*** 声明交换机* 因为交换机的类型没有延时类型 所以使用自定义交换机*/@Beanpublic CustomExchange delayedExchange(){HashMap<String, Object> args = new HashMap<>();//自定义交换机的类型args.put("x-delayed-type", "direct");// 对应参数: 交换机的名称 x-delayed-message说明是延时消息交换机 是否序列化 是否自动删除,参数return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);}//进行绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchangedelayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
生产者代码
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);}
消费者代码
public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);}
测试发送
http://localhost:8080/ttl/sendDelayMsg/qwer/20000
http://localhost:8080/ttl/sendDelayMsg/121212/2000
相关文章:
Springboot 整合MQ实现延时队列入门
延时队列 添加依赖配置文件队列TTL代码架构图交换机、队列、绑定配置文件代码生产者代码消费者代码延时队列优化添加普通队列配置代码生产者发送消息是进行设置消息的ttl 通过MQ 插件实现延时队列代码架构图配置交换机生产者代码消费者代码测试发送 添加依赖 <!-- rabbitMQ …...
前端基础(Vue框架)
前言:前端开发框架——Vue框架学习。 准备工作:添加Vue devtools扩展工具 具体可查看下面的这篇博客 添加vue devtools扩展工具添加后F12不显示Vue图标_MRJJ_9的博客-CSDN博客 Vue官方学习文档 Vue.js - 渐进式 JavaScript 框架 | Vue.js 目录 MV…...
【实用插件】ArcGIS for AutoCAD插件分享下载
ArcGIS包含一系列功能,其中ArcGIS for AutoCAD一个免费的可下载的AutoCAD插件,它可简化将CAD和GIS数据整合在一起的过程提供互操作性。 ArcGIS for AutoCAD互操作性平台将连接AutoCAD和 ArcGIS,以增强使用地理环境设计CAD工程图时的用户体验…...
GaussDB数据库SQL系列-子查询
目录 一、前言 二、GaussDB SQL子查询表达式 1、EXISTS/NOT EXISTS 2、IN/NOT IN 3、ANY/SOME 4、ALL 三、GaussDB SQL子查询实验示例 1、创建实验表 2、EXISTS/NOT EXISTS示例 3、IN/NOT IN 示例 4、ANY/SOME 示例 5、ALL示例 四、注意事项及建议 五、小结 一、…...
Kafka 什么速度那么快
批量发送消息 Kafka 采用了批量发送消息的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合,看似很平常的一个手段,其实它大大提升了 Kafka 的吞吐量。 消息压缩 消息压缩的目的是为了进一步减少网络传输带宽。而…...
环形链表笔记(自用)
环形链表 不管怎么样slow最多走半圈了, 快慢指针slow走一步,fast走两步最合适,因为假设fast和slow相差n每一次他们前进,就会相差n-1步,这样他们一定会相遇,如果是环形链表的话。 代码 /*** Definition for…...
js循环中发起请求数据不一致问题
项目场景: 在公司的一个项目中需要使用循环更改查询条件,然后查询子表数据,但是在查询过程中for下面的key变化了之后,查询中的key却并没有变化,导致查询的参数不一致,从未结果数据出错 for(let i 0;i<…...
工作流自动化:提升效率、节约成本的重要工具
在现代社会中,软件和技术的运用使得我们的日常活动变得更加简单和高效。然而,这些技术也有自身的特点和独特之处。尽管我们使用这些工具来简化工作,但有时仍需要一些人工干预,比如手动数据录入。在工作场所中,手动数据…...
仿牛客论坛项目day7|Kafka
一、阻塞队列 创建了一个生产者线程和一个消费者线程。生产者线程向队列中放入元素,消费者线程从队列中取出元素。我们可以看到,当队列为空时,消费者线程会被阻塞,直到生产者线程向队列中放入新的元素。 二、Kafka入门 发布、订阅…...
[SpringCloud] 组件性能优化技巧
Feign 配置优化hystrix配置 优化ribbon 优化Servlet 容器 优化Zuul配置 优化 文章目录 1.Servlet 容器 优化2.Feign 配置优化3.Zuul配置 优化4.hystrix配置 优化5.ribbon 优化 1.Servlet 容器 优化 默认情况下, Spring Boot 使用 Tomcat 来作为内嵌的 Servlet 容器, 可以将 We…...
okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp
okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp 1、引入Maven1.1、okhttp发起请求官网Demo 2、下载文件3、扩充,读写 txt文件内容3.1读写内容 示例 http客户端 用的是 okhttp,也可以用 UrlConnetcion或者apache …...
Oracle/PL/SQL奇技淫巧之Json转表
在Oracle中,有些时候我们需要在一个json文档中查数据 这个时候我们可以通过JSON_TABLE函数来把 json文档 提取成一张可以执行正常查询操作的表 先看JSON_TABLE函数的基础用法: JSON_TABLE(json_data, $.json_path COLUMNS (column_definitions))其中&a…...
每日一学——网络安全
网络安全设计、原则、审计等知识点的精讲如下: 网络安全设计与原则: 网络安全设计是指在系统或网络的设计过程中考虑到安全性,并采取相应的安全措施来保护系统或网络不受威胁。安全设计原则包括最小权限原则(Least Privilege Prin…...
python中的lstm:介绍和基本使用方法
python中的lstm:介绍和基本使用方法 未使用插件 LSTM(Long Short-Term Memory)是一种循环神经网络(RNN)的变体,专门用于处理序列数据。LSTM 可以记忆序列中的长期依赖关系,这使得它非常适合于各…...
【Flink】Flink窗口触发器
数据进入到窗口的时候,窗口是否触发后续的计算由窗口触发器决定,每种类型的窗口都有对应的窗口触发机制。WindowAssigner 默认的 Trigger通常可解决大多数的情况。我们通常使用方式如下,调用trigger()方法把我们想执行触发器传递进去: SingleOutputStreamOperator<Produ…...
深度云化时代,什么样的云网络才是企业的“心头好”?
科技云报道原创。 近年来企业上云的快速推进,对云网络提出了更多需求。 最初,云网络只是满足互联网业务公网接入。 随着移动互联网的发展,企业对云上网络安全隔离能力和互访能力、企业数据中心与云上网络互联、构建混合云的能力࿰…...
【快应用】快应用广告学习之激励视频广告
【关键词】 快应用、激励视频广告、广告接入 【介绍】 一、关于激励视频广告 定义:用户通过观看完整的视频广告,获得应用内相关的奖励。适用场景:游戏/快游戏的通关、继续机会、道具获取、积分等场景中,阅读、影音等应用的权益体系…...
国产化系统中遇到的视频花屏、卡顿以及延迟问题的记录与总结
目录 1、国产化系统概述 1.1、国产化操作系统与国产化CPU 1.2、国产化服务器操作系统 1.3、当前国产化系统的主流配置 2、视频解码花屏与卡顿问题 2.1、视频解码花屏 2.2、视频解码卡顿 2.3、关于I帧和P帧的说明 3、国产显卡处理速度慢导致图像卡顿问题 3.1、视频延…...
go内存管理机制
golang内存管理基本是参考tcmalloc来进行的。go内存管理本质上是一个内存池,只不过内部做了很多优化:自动伸缩内存池大小,合理切割内存块。 基本概念: Page:页,一块 8 K大小的内存空间。Go向操作系统申请和…...
【Python】Web学习笔记_flask(5)——会话cookie对象
HTTP是无状态协议,一次请求响应结束后,服务器不会留下对方信息,对于大部分web程序来说,是不方便的,所以有了cookie技术,通过在请求和响应保温中添加cookie数据来保存客户端的状态。 html代码: …...
从用户体验出发:手把手教你用uniapp的showLoading/showToast/showModal设计友好交互
从用户体验出发:手把手教你用uniapp的showLoading/showToast/showModal设计友好交互 在移动应用开发中,交互设计的好坏直接影响用户留存率。数据显示,超过60%的用户会因为糟糕的交互体验而卸载应用。作为开发者,我们不仅要关注功能…...
为AI智能体构建自动化RSS信息管道:agent-rss工具详解与实践
1. 项目概述:为AI智能体打造的RSS信息管道 如果你正在构建或使用AI智能体(比如Claude Code、OpenClaw这类工具),并且希望它们能像人类一样,定时、定向地获取互联网上的最新信息,那么你很可能需要一个专门为…...
技术人必备的Chrome插件清单:第7个让调试效率翻倍
对于软件测试从业者而言,浏览器早已不是单纯的信息浏览窗口,而是集接口调试、性能分析、元素定位、辅助功能验证于一体的核心工作站。面对日益复杂的Web应用和紧迫的交付周期,一套精悍的Chrome插件组合往往能带来远超预期的效率回报。本文从测…...
华为会议转任务AI精准识别整理,省事更清晰,轻松搞定工作落地
"找2026华为会议转任务AI的朋友,你要的精准识别整理、落地工作的真实测评来了。不管你是做学术研究要整访谈、转讲座,还是开会长音频要扒任务,我测了大半个月,直接给你掏实底。我接触太多做学术的朋友,都踩过AI转…...
Python开发进阶之路:探索异步编程与高性能应用
在当今快节奏的软件开发环境中,构建高性能、可扩展的应用程序已成为开发者的首要任务。随着互联网应用的普及,用户对响应速度和并发处理能力的要求越来越高。Python,作为一种广泛使用的高级编程语言,凭借其简洁的语法和强大的生态…...
AI编程助手上下文管理工具devcontext:构建项目记忆库提升开发效率
1. 项目概述:当AI助手拥有“记忆”,开发效率的质变如果你和我一样,每天大部分时间都在和代码编辑器、终端以及各种文档打交道,那你一定对这样的场景不陌生:接手一个新项目,光是理解代码库的结构、各个模块的…...
Claude Code 多项目 API 配置管理实践
背景 Claude Code 的项目级配置文件 .claude/settings.json 中包含 API 提供商相关的环境变量。当同时维护多个项目,每个项目使用不同的 API 提供商(Anthropic 直连、OpenRouter 代理、自建转发等)时,每次切换项目都需要手动修改…...
Articuler.Ai 技术深度解析:海量人脉匹配、数字足迹解析与高转化冷触达引擎
摘要Articuler.Ai 是一款面向商业人脉精准匹配与高效触达的 AI 引擎,核心定位为 “商业关系搜索引擎 智能触达工作台”,彻底重构传统关键词搜索失效背景下的 B2B 人脉连接逻辑。本文从9.8 亿级公开档案数据底座、语义匹配引擎架构、Playbook 深度解析技…...
淘宝要接入AI购物助手:以后买东西,可能不是搜索,而是“让AI帮你挑”
最近AI圈有一个很值得关注的新热点。据路透社5月10日报道,阿里巴巴正准备把通义千问Qwen接入淘宝,让用户可以通过和AI聊天的方式浏览、比较和购买商品,而不是像以前那样自己一个个翻商品列表。报道还提到,Qwen应用将接入淘宝和天猫…...
别再只盯着VGA线了!手把手教你用示波器看懂RGBHV时序图(附绿同步电路分析)
数字示波器实战:解码RGBHV信号与绿同步电路设计全指南 在复古游戏机改造、CRT显示器维修或视频转换板设计的场景中,RGBHV信号的理解与测量往往是硬件工程师和电子爱好者面临的第一道技术门槛。不同于现代数字接口的标准化协议,模拟视频信号时…...
