当前位置: 首页 > news >正文

Springboot 整合MQ实现延时队列入门

延时队列

  • 添加依赖
  • 配置文件
  • 队列TTL
    • 代码架构图
    • 交换机、队列、绑定配置文件代码
    • 生产者代码
    • 消费者代码
    • 延时队列优化
    • 添加普通队列配置代码
    • 生产者发送消息是进行设置消息的ttl
  • 通过MQ 插件实现延时队列
    • 代码架构图
    • 配置交换机
    • 生产者代码
    • 消费者代码
    • 测试发送

添加依赖

 <!-- rabbitMQ 集成 spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

配置文件

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

队列TTL

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
在这里插入图片描述

交换机、队列、绑定配置文件代码

package com.wlj.rabbitmq.sbmq.confing;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 MQ配置*/
@Configuration
public class TtlQueueConfig {//X交换机public static final String X_EXCHANGE = "X";//QA队列public static final String QUEUE_A = "QA";//QB队列public static final String QUEUE_B = "QB";//Y死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//QD死信队列public static final String DEAD_LETTER_QUEUE = "QD";//声明x交换机@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明y交换机@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明QA队列 ttl为10秒 并绑定对应的死信交换机@Bean("queueA")public Queue queueA(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}//声明QB队列 ttl为40秒 并绑定对应的死信交换机@Bean("queueB")public Queue queueB(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//设置队列QA 绑定交换机X@Beanpublic Binding  queueaBindingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//设置队列QB 绑定交换机X@Beanpublic Binding  queuebBindingX(@Qualifier("queueB")Queue queueB,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//声明死信队列QD@Bean("queueD")public  Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//死信队列和死信交换机绑定@Beanpublic Binding queuedBind(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return  BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码

package com.wlj.rabbitmq.sbmq.confing.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 生产者controller*/
@Slf4j
@RestController
@RequestMapping("ttl")
public class MsgController {@ResourceRabbitTemplate rabbitTemplate;@GetMapping("/send/{msg}")public void sendMsg(@PathVariable String msg){log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), msg);rabbitTemplate.convertAndSend("X","XA","发送的消息,延时10秒: "+msg);rabbitTemplate.convertAndSend("X","XB","发送的消息,延时40秒: "+msg);}
}

消费者代码

package com.wlj.rabbitmq.sbmq.confing.dead;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/***@创建人 wlj*@创建时间 2023/8/16*@描述 消费者*/
@Component
@Slf4j
public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void getMsg(Message msg, Channel channel){System.out.println(new String(msg.getBody()));log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);}
}

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻
以上代码声明队列的时候,设置队列的延时时间是10秒和40秒,意味着所有进入队列的消息都是根据队列的延时时间的。这就会有一个问题,如果说业务需要延时20秒、15秒、一分钟、等等等等,难道都需要创建每一种延时队列吗?那岂不是要增加无数个队列才能满足需求。下面就进行优化延时队列

延时队列优化

代码架构图
在这里插入图片描述
声明一个普通的队列,只需要在生产消息的时候设置消息的延时时间即可。

添加普通队列配置代码

  //声明普通队列QC代码@Bean("queueC")public  Queue queueC(){HashMap<String, Object> args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");return  QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//设置QC队列和X交换机绑定@Beanpublic  Binding queuecBindX(@Qualifier("queueC")Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者发送消息是进行设置消息的ttl

@GetMapping("/send/{msg}/{ttl}")public void sendMsgTtl(@PathVariable String msg,@PathVariable String ttl){rabbitTemplate.convertAndSend("X","XC",msg,correlationData->{correlationData.getMessageProperties().setExpiration(ttl);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttl, msg);}

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻/20000
http://localhost:8080/ttl/send/哈哈哈/2000

在这里插入图片描述
消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

通过MQ 插件实现延时队列

Windows 安装MQ延时插件,请查看
Linux 安装MQ延时插件,请查看

代码架构图

延时队列是交换机进行把控消息的ttl。ttl到期才会发送到对应到队列
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
在这里插入图片描述

配置交换机

package com.wlj.rabbitmq.sbmq.confing.plugins;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;/***@创建人 wlj*@创建时间 2023/8/16*@描述 基于插件实现延时消息发送*/
@Component
public class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingkeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();}/***   声明交换机*   因为交换机的类型没有延时类型 所以使用自定义交换机*/@Beanpublic CustomExchange delayedExchange(){HashMap<String, Object> args = new HashMap<>();//自定义交换机的类型args.put("x-delayed-type", "direct");// 对应参数: 交换机的名称 x-delayed-message说明是延时消息交换机 是否序列化 是否自动删除,参数return  new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);}//进行绑定@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchangedelayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者代码

 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);}

消费者代码

  public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);}

测试发送

http://localhost:8080/ttl/sendDelayMsg/qwer/20000
http://localhost:8080/ttl/sendDelayMsg/121212/2000

相关文章:

Springboot 整合MQ实现延时队列入门

延时队列 添加依赖配置文件队列TTL代码架构图交换机、队列、绑定配置文件代码生产者代码消费者代码延时队列优化添加普通队列配置代码生产者发送消息是进行设置消息的ttl 通过MQ 插件实现延时队列代码架构图配置交换机生产者代码消费者代码测试发送 添加依赖 <!-- rabbitMQ …...

前端基础(Vue框架)

前言&#xff1a;前端开发框架——Vue框架学习。 准备工作&#xff1a;添加Vue devtools扩展工具 具体可查看下面的这篇博客 添加vue devtools扩展工具添加后F12不显示Vue图标_MRJJ_9的博客-CSDN博客 Vue官方学习文档 Vue.js - 渐进式 JavaScript 框架 | Vue.js 目录 MV…...

【实用插件】ArcGIS for AutoCAD插件分享下载

ArcGIS包含一系列功能&#xff0c;其中ArcGIS for AutoCAD一个免费的可下载的AutoCAD插件&#xff0c;它可简化将CAD和GIS数据整合在一起的过程提供互操作性。 ArcGIS for AutoCAD互操作性平台将连接AutoCAD和 ArcGIS&#xff0c;以增强使用地理环境设计CAD工程图时的用户体验…...

GaussDB数据库SQL系列-子查询

目录 一、前言 二、GaussDB SQL子查询表达式 1、EXISTS/NOT EXISTS 2、IN/NOT IN 3、ANY/SOME 4、ALL 三、GaussDB SQL子查询实验示例 1、创建实验表 2、EXISTS/NOT EXISTS示例 3、IN/NOT IN 示例 4、ANY/SOME 示例 5、ALL示例 四、注意事项及建议 五、小结 一、…...

Kafka 什么速度那么快

批量发送消息 Kafka 采用了批量发送消息的方式&#xff0c;通过将多条消息按照分区进行分组&#xff0c;然后每次发送一个消息集合&#xff0c;看似很平常的一个手段&#xff0c;其实它大大提升了 Kafka 的吞吐量。 消息压缩 消息压缩的目的是为了进一步减少网络传输带宽。而…...

环形链表笔记(自用)

环形链表 不管怎么样slow最多走半圈了&#xff0c; 快慢指针slow走一步&#xff0c;fast走两步最合适&#xff0c;因为假设fast和slow相差n每一次他们前进&#xff0c;就会相差n-1步&#xff0c;这样他们一定会相遇&#xff0c;如果是环形链表的话。 代码 /*** Definition for…...

js循环中发起请求数据不一致问题

项目场景&#xff1a; 在公司的一个项目中需要使用循环更改查询条件&#xff0c;然后查询子表数据&#xff0c;但是在查询过程中for下面的key变化了之后&#xff0c;查询中的key却并没有变化&#xff0c;导致查询的参数不一致&#xff0c;从未结果数据出错 for(let i 0;i<…...

工作流自动化:提升效率、节约成本的重要工具

在现代社会中&#xff0c;软件和技术的运用使得我们的日常活动变得更加简单和高效。然而&#xff0c;这些技术也有自身的特点和独特之处。尽管我们使用这些工具来简化工作&#xff0c;但有时仍需要一些人工干预&#xff0c;比如手动数据录入。在工作场所中&#xff0c;手动数据…...

仿牛客论坛项目day7|Kafka

一、阻塞队列 创建了一个生产者线程和一个消费者线程。生产者线程向队列中放入元素&#xff0c;消费者线程从队列中取出元素。我们可以看到&#xff0c;当队列为空时&#xff0c;消费者线程会被阻塞&#xff0c;直到生产者线程向队列中放入新的元素。 二、Kafka入门 发布、订阅…...

[SpringCloud] 组件性能优化技巧

Feign 配置优化hystrix配置 优化ribbon 优化Servlet 容器 优化Zuul配置 优化 文章目录 1.Servlet 容器 优化2.Feign 配置优化3.Zuul配置 优化4.hystrix配置 优化5.ribbon 优化 1.Servlet 容器 优化 默认情况下, Spring Boot 使用 Tomcat 来作为内嵌的 Servlet 容器, 可以将 We…...

okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp

okhttp下载文件 Java下载文件 javaokhttp下载文件 下载文件 java下载 okhttp下载 okhttp 1、引入Maven1.1、okhttp发起请求官网Demo 2、下载文件3、扩充&#xff0c;读写 txt文件内容3.1读写内容 示例 http客户端 用的是 okhttp&#xff0c;也可以用 UrlConnetcion或者apache …...

Oracle/PL/SQL奇技淫巧之Json转表

在Oracle中&#xff0c;有些时候我们需要在一个json文档中查数据 这个时候我们可以通过JSON_TABLE函数来把 json文档 提取成一张可以执行正常查询操作的表 先看JSON_TABLE函数的基础用法&#xff1a; JSON_TABLE(json_data, $.json_path COLUMNS (column_definitions))其中&a…...

每日一学——网络安全

网络安全设计、原则、审计等知识点的精讲如下&#xff1a; 网络安全设计与原则&#xff1a; 网络安全设计是指在系统或网络的设计过程中考虑到安全性&#xff0c;并采取相应的安全措施来保护系统或网络不受威胁。安全设计原则包括最小权限原则&#xff08;Least Privilege Prin…...

python中的lstm:介绍和基本使用方法

python中的lstm&#xff1a;介绍和基本使用方法 未使用插件 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种循环神经网络&#xff08;RNN&#xff09;的变体&#xff0c;专门用于处理序列数据。LSTM 可以记忆序列中的长期依赖关系&#xff0c;这使得它非常适合于各…...

【Flink】Flink窗口触发器

数据进入到窗口的时候,窗口是否触发后续的计算由窗口触发器决定,每种类型的窗口都有对应的窗口触发机制。WindowAssigner 默认的 Trigger通常可解决大多数的情况。我们通常使用方式如下,调用trigger()方法把我们想执行触发器传递进去: SingleOutputStreamOperator<Produ…...

深度云化时代,什么样的云网络才是企业的“心头好”?

科技云报道原创。 近年来企业上云的快速推进&#xff0c;对云网络提出了更多需求。 最初&#xff0c;云网络只是满足互联网业务公网接入。 随着移动互联网的发展&#xff0c;企业对云上网络安全隔离能力和互访能力、企业数据中心与云上网络互联、构建混合云的能力&#xff0…...

【快应用】快应用广告学习之激励视频广告

【关键词】 快应用、激励视频广告、广告接入 【介绍】 一、关于激励视频广告 定义&#xff1a;用户通过观看完整的视频广告&#xff0c;获得应用内相关的奖励。适用场景&#xff1a;游戏/快游戏的通关、继续机会、道具获取、积分等场景中&#xff0c;阅读、影音等应用的权益体系…...

国产化系统中遇到的视频花屏、卡顿以及延迟问题的记录与总结

目录 1、国产化系统概述 1.1、国产化操作系统与国产化CPU 1.2、国产化服务器操作系统 1.3、当前国产化系统的主流配置 2、视频解码花屏与卡顿问题 2.1、视频解码花屏 2.2、视频解码卡顿 2.3、关于I帧和P帧的说明 3、国产显卡处理速度慢导致图像卡顿问题 3.1、视频延…...

go内存管理机制

golang内存管理基本是参考tcmalloc来进行的。go内存管理本质上是一个内存池&#xff0c;只不过内部做了很多优化&#xff1a;自动伸缩内存池大小&#xff0c;合理切割内存块。 基本概念&#xff1a; Page&#xff1a;页&#xff0c;一块 8 K大小的内存空间。Go向操作系统申请和…...

【Python】Web学习笔记_flask(5)——会话cookie对象

HTTP是无状态协议&#xff0c;一次请求响应结束后&#xff0c;服务器不会留下对方信息&#xff0c;对于大部分web程序来说&#xff0c;是不方便的&#xff0c;所以有了cookie技术&#xff0c;通过在请求和响应保温中添加cookie数据来保存客户端的状态。 html代码&#xff1a; …...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...