SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷
前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。
但是呢,没有演示具体应用到项目中的实例。
这里使用RabbitMQ来实现流量的削峰添谷。
一:添加pom依赖
<!--rabbitmq-需要的 AMQP 依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二:yml配置
spring:
#配置rabbitmq 服务器
rabbitmq:virtual-host: /host: 1.15.157.156port: 5672username: xxxxxpassword: xxxxx# 开启发布确认机制#SIMPLE, // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()#CORRELATED, // 使用 CorrelationData 关联确认与发送的消息#NONE // 不启用发布确认publisher-confirm-type: correlated# publisher-confirms 消息的可靠投递, confirm 确认模式 默认为false#publisher-confirms: true# 添加发布确认返回, return 回退模式 默认为falsepublisher-returns: true### listenerlistener:# 每次从队列中预取5条消息prefetch: 20# 最小消费者数量concurrency: 1# 最大的消费者数量max-concurrency: 10simple:# 设置预取数量为1 每次取一个prefetch: 1# manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack,虽灵活但会提高编码复杂度。# auto:自动 ack,没有异常则返回 ack;抛出异常则返回 nack,消息重新入队,一直到没有异常为止,也可以设置最大重试次数,超过次数后发送到专门收集错误消息的队列进一步处理# none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除(消息投递是不可靠的,可能丢失)acknowledge-mode: manual# 失败重试retry:# 开启消费者失败重试enabled: true# 初始的失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 3# 最大重试次数max-attempts: 4# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true
具体的配置都有对应的注释,参照即可。
三:编写config配置类
package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认是呗,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调 =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}@BeanQueue trafficSpikedQueue(){return new Queue("trafficSpikedQueue", true);}@BeanDirectExchange trafficSpikedExchange(){return new DirectExchange("trafficSpikedExchange");}@BeanBinding binding(Queue trafficSpikedQueue, DirectExchange trafficSpikedExchange){return BindingBuilder.bind(trafficSpikedQueue).to(trafficSpikedExchange).with("trafficSpikedKey");}//*/
}
四:创建生产者
package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/traffic")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("trafficExchange","trafficKey","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}// rabbitTemplate.convertAndSend("trafficSpikedExchange", "trafficSpikedKey", message);return "Message sent";}
}
五:创建消费者
package com.modules.controller.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.*;
import java.io.IOException;@Component
public class TrafficSpikedConsumer {@RabbitListener(queues = "trafficQueue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// =========================================// 处理消息,例如写入数据库或进行计算System.out.println("Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =========================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}
控制台输出的数据比较多。我这里就不做展示了。
PS:我这里测试的时候遇到一个小问题,发现消费者最后消费的数量跟生产者生产的数量对不上。我百思不得其解。这问题出在哪里呢?
后来,我才发现,我测试是在本地做的测试,对应的代码,我服务器端打包的jar里边也有一份,也就是说,我一个生产者,对应两个消费者(本地+服务器)这也是我本地消费者消费的数量跟生产数量不一致的原因。
以上大概就是Springboot集成RabbitMQ实现流量削峰添谷的一个小例子。
通过RabbitMQ的队列机制,可以有效地缓解高峰期的流量压力。
有好的建议,请在下方输入你的评论。
相关文章:
SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷
前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。 但是呢,没有演示具体应用到项目中的实例。 这里使用RabbitMQ来实现流量的削峰添谷。 一:添加pom依赖 <!--rabbitmq-需要的 AMQP 依赖--> <dependency><groupId>org.springfr…...
前端 Vue 3 后端 Node.js 和Express 结合cursor常见提示词结构
cursor 提示词 后端提示词 请为我开发一个基于 Node.js 和Express 框架的 Todo List 后端项目。项目需要实现以下四个 RESTful API 接口: 查询所有待办事项 接口名: GET /api/get-todo功能: 从数据库的’list’集合中查询并返回所有待办事项参数: 无返回: 包含所…...
类和对象(下):点亮编程星河的类与对象进阶之光
再探构造函数 在实现构造函数时,对成员变量进行初始化主要有两种方式: 一种是常见的在函数体内赋值进行初始化;另一种则是通过初始化列表来完成初始化。 之前我们在构造函数中经常采用在函数体内对成员变量赋值的方式来给予它们初始值。例如&…...
42.接雨水
目录 题目过程解法 题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 过程 发现有特殊情况就是,最高峰的地方,如果右边小于他,然后再右边也都很小的话,…...
使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费
文章目录 1、指定 Offset 消费2、指定时间消费 1、指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest (1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning (2)lates…...
Ubuntu安装不同版本的opencv,并任意切换使用
参考: opencv笔记:ubuntu安装opencv以及多版本共存 | 高深远的博客 https://zhuanlan.zhihu.com/p/604658181 安装不同版本opencv及共存、切换并验证。_pkg-config opencv --modversion-CSDN博客 Ubuntu下多版本OpenCV共存和切换_ubuntu20如同时安装o…...
突破内存限制:Mac Mini M2 服务器化实践指南
本篇文章,我们聊聊如何使用 Mac Mini M2 来实现比上篇文章性价比更高的内存服务器使用,分享背后的一些小的思考。 希望对有类似需求的你有帮助。 写在前面 在上文《ThinkPad Redis:构建亿级数据毫秒级查询的平民方案》中,我们…...
【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法
毕业设计排版时,一般要求每章节的起始页为奇数页,空白页不显示页眉和页脚。具体做法如下: 1 Word 在一个章节的内容完成后,在【布局】中,点击【分隔符】,然后选择【奇数页】 这样在下一章节开始的时&…...
【在Linux世界中追寻伟大的One Piece】多线程(二)
目录 1 -> 分离线程 2 -> Linux线程互斥 2.1 -> 进程线程间的互斥相关背景概念 2.2 -> 互斥量mutex 2.3 -> 互斥量的接口 2.4 -> 互斥量实现原理探究 3 -> 可重入VS线程安全 3.1 -> 概念 3.2 -> 常见的线程不安全的情况 3.3 -> 常见的…...
flink学习(8)——窗口函数
增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...
「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)
LightningChart .NET完全由GPU加速,并且性能经过优化,可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D,高级3D,Polar,Smith,3D饼/甜甜圈,地理地图和GIS图表以及适用于科…...
鸿蒙Native使用Demo
DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...
29.UE5蓝图的网络通讯,多人自定义事件,变量同步
3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器,即将房主作为…...
Scala—列表(可变ListBuffer、不可变List)用法详解
Scala集合概述-链接 大家可以点击上方链接,先对Scala的集合有一个整体的概念🤣🤣🤣 在 Scala 中,列表(List)分为不可变列表(List)和可变列表(ListBuffer&…...
【论文复现】偏标记学习+图像分类
📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...
C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术
C嘎嘎探索篇:栈与队列的交响:C中的结构艺术 前言: 小编在之前刚完成了C中栈和队列(stack和queue)的讲解,忘记的小伙伴可以去我上一篇文章看一眼的,今天小编将会带领大家吹奏栈和队列的交响&am…...
AIGC-----AIGC在虚拟现实中的应用前景
AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容(AIGC)的快速发展,虚拟现实(VR)技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性,这种组合不仅极大地降低了VR内容的…...
Django 路由层
1. 路由基础概念 URLconf (URL 配置):Django 的路由系统是基于 urls.py 文件定义的。路径匹配:通过模式匹配 URL,并将请求传递给对应的视图处理函数。命名路由:每个路由可以定义一个名称,用于反向解析。 2. 基本路由配…...
《硬件架构的艺术》笔记(八):消抖技术
简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号,这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。(就是每次断开闭合只对应一个操作)。 抖动在某些模拟和逻辑电路中可能产生问…...
Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系
一.什么是Spring?它解决了什么问题? 1.1什么是Spring? Spring,一般指代的是Spring Framework 它是一个开源的应用程序框架,提供了一个简易的开发方式,通过这种开发方式,将避免那些可能致使代码…...
嵌入式Linux实战:全志T3+vsftpd实现轻量级文件传输(含WinSCP连接教程)
嵌入式Linux实战:全志T3vsftpd实现轻量级文件传输(含WinSCP连接教程) 在物联网设备开发中,文件传输是一个看似简单却充满挑战的环节。当你的开发板是全志T3这样的资源受限平台时,如何在有限的存储和内存条件下搭建一个…...
3分钟上手!Balena Etcher:安全烧录系统镜像的终极解决方案
3分钟上手!Balena Etcher:安全烧录系统镜像的终极解决方案 【免费下载链接】etcher Flash OS images to SD cards & USB drives, safely and easily. 项目地址: https://gitcode.com/GitHub_Trending/et/etcher 你是否曾因烧录系统镜像而丢失…...
收藏!小白程序员必看:轻松掌握大模型核心技术,解决领域与时间限制难题!
通用大模型的两个硬伤——领域限制(不知道企业内部数据)和时间限制(无法获取最新信息)。 产品设计的第一步,不是写提示词,是厘清"模型不知道什么"。这与传统软件开发思维完全不同——传统软件是&…...
别再只调API了!用Langchain4j的RAG功能,5分钟给你的Java应用加上专属知识库
用Langchain4j的RAG功能为Java应用快速构建智能知识库 在当今信息爆炸的时代,企业内部的文档资料往往分散在各个角落,员工需要花费大量时间查找相关信息。传统的全文检索方式虽然能解决部分问题,但当用户用自然语言提问时,往往难…...
SpringBoot 静态资源加载失败:favicon.ico 缺失问题解析
1. 为什么你的SpringBoot项目总在报favicon.ico缺失? 每次启动SpringBoot项目时,控制台总是刷出一堆红色警告,其中最让人头疼的就是"No static resource favicon.ico"这个错误。作为一个踩过无数次坑的老司机,我可以负…...
别再手动打字了!用uniapp+百度语音识别,5分钟搞定语音转文字功能(附完整代码)
用UniApp百度语音识别实现高效语音转文字功能 在移动应用开发中,语音输入正逐渐成为提升用户体验的关键功能。想象一下,用户无需费力敲击虚拟键盘,只需轻按按钮说话,文字就能自动出现在输入框中——这种交互方式不仅自然流畅&…...
【云原生Java冷启动优化黄金法则】:20年实战提炼的7步精准调优路径(含GraalVM+Quarkus实测数据)
第一章:云原生Java函数计算冷启动问题的本质剖析云原生Java函数计算中的冷启动并非单纯由JVM启动耗时导致,而是多层资源调度与运行时初始化耦合引发的系统性延迟现象。其本质在于函数实例生命周期与请求到达时间的异步解耦——当无活跃实例可用时&#x…...
3月技术风暴:程序员的范式革命——2026年3月科技大事件记录
2025年3月:颠覆性技术狂潮与程序员认知升维全纪录 3月结束,你感受到“版本迭代”的压力了吗? 2025年的春天不是春暖花开,而是技术奇点的“温度骤升”。本文绝非一份普通事件清单,而是用程序员的第一性原理,…...
Qwen3-TTS声音克隆入门指南:上传音频→选择语种→生成自然语音三步走
Qwen3-TTS声音克隆入门指南:上传音频→选择语种→生成自然语音三步走 想不想让AI用你自己的声音说话?或者,想不想用一段短短的录音,就克隆出能说十几种语言的“数字分身”?今天,我们就来手把手教你&#x…...
如何用dashdot打造高颜值服务器监控面板?完整配置教程
如何用dashdot打造高颜值服务器监控面板?完整配置教程 【免费下载链接】dashdot A simple, modern server dashboard, primarily used by smaller private servers 项目地址: https://gitcode.com/gh_mirrors/da/dashdot dashdot是一款现代化的服务器监控面板…...
