SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷
前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。
但是呢,没有演示具体应用到项目中的实例。
这里使用RabbitMQ来实现流量的削峰添谷。
一:添加pom依赖
<!--rabbitmq-需要的 AMQP 依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二:yml配置
spring:
#配置rabbitmq 服务器
rabbitmq:virtual-host: /host: 1.15.157.156port: 5672username: xxxxxpassword: xxxxx# 开启发布确认机制#SIMPLE, // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()#CORRELATED, // 使用 CorrelationData 关联确认与发送的消息#NONE // 不启用发布确认publisher-confirm-type: correlated# publisher-confirms 消息的可靠投递, confirm 确认模式 默认为false#publisher-confirms: true# 添加发布确认返回, return 回退模式 默认为falsepublisher-returns: true### listenerlistener:# 每次从队列中预取5条消息prefetch: 20# 最小消费者数量concurrency: 1# 最大的消费者数量max-concurrency: 10simple:# 设置预取数量为1 每次取一个prefetch: 1# manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack,虽灵活但会提高编码复杂度。# auto:自动 ack,没有异常则返回 ack;抛出异常则返回 nack,消息重新入队,一直到没有异常为止,也可以设置最大重试次数,超过次数后发送到专门收集错误消息的队列进一步处理# none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除(消息投递是不可靠的,可能丢失)acknowledge-mode: manual# 失败重试retry:# 开启消费者失败重试enabled: true# 初始的失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 3# 最大重试次数max-attempts: 4# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true
具体的配置都有对应的注释,参照即可。
三:编写config配置类
package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认是呗,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调 =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}@BeanQueue trafficSpikedQueue(){return new Queue("trafficSpikedQueue", true);}@BeanDirectExchange trafficSpikedExchange(){return new DirectExchange("trafficSpikedExchange");}@BeanBinding binding(Queue trafficSpikedQueue, DirectExchange trafficSpikedExchange){return BindingBuilder.bind(trafficSpikedQueue).to(trafficSpikedExchange).with("trafficSpikedKey");}//*/
}
四:创建生产者
package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/traffic")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("trafficExchange","trafficKey","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}// rabbitTemplate.convertAndSend("trafficSpikedExchange", "trafficSpikedKey", message);return "Message sent";}
}
五:创建消费者
package com.modules.controller.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.*;
import java.io.IOException;@Component
public class TrafficSpikedConsumer {@RabbitListener(queues = "trafficQueue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// =========================================// 处理消息,例如写入数据库或进行计算System.out.println("Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =========================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}
控制台输出的数据比较多。我这里就不做展示了。
PS:我这里测试的时候遇到一个小问题,发现消费者最后消费的数量跟生产者生产的数量对不上。我百思不得其解。这问题出在哪里呢?
后来,我才发现,我测试是在本地做的测试,对应的代码,我服务器端打包的jar里边也有一份,也就是说,我一个生产者,对应两个消费者(本地+服务器)这也是我本地消费者消费的数量跟生产数量不一致的原因。
以上大概就是Springboot集成RabbitMQ实现流量削峰添谷的一个小例子。
通过RabbitMQ的队列机制,可以有效地缓解高峰期的流量压力。
有好的建议,请在下方输入你的评论。
相关文章:
SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷
前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。 但是呢,没有演示具体应用到项目中的实例。 这里使用RabbitMQ来实现流量的削峰添谷。 一:添加pom依赖 <!--rabbitmq-需要的 AMQP 依赖--> <dependency><groupId>org.springfr…...
前端 Vue 3 后端 Node.js 和Express 结合cursor常见提示词结构
cursor 提示词 后端提示词 请为我开发一个基于 Node.js 和Express 框架的 Todo List 后端项目。项目需要实现以下四个 RESTful API 接口: 查询所有待办事项 接口名: GET /api/get-todo功能: 从数据库的’list’集合中查询并返回所有待办事项参数: 无返回: 包含所…...
类和对象(下):点亮编程星河的类与对象进阶之光
再探构造函数 在实现构造函数时,对成员变量进行初始化主要有两种方式: 一种是常见的在函数体内赋值进行初始化;另一种则是通过初始化列表来完成初始化。 之前我们在构造函数中经常采用在函数体内对成员变量赋值的方式来给予它们初始值。例如&…...
42.接雨水
目录 题目过程解法 题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 过程 发现有特殊情况就是,最高峰的地方,如果右边小于他,然后再右边也都很小的话,…...
使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费
文章目录 1、指定 Offset 消费2、指定时间消费 1、指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest (1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning (2)lates…...
Ubuntu安装不同版本的opencv,并任意切换使用
参考: opencv笔记:ubuntu安装opencv以及多版本共存 | 高深远的博客 https://zhuanlan.zhihu.com/p/604658181 安装不同版本opencv及共存、切换并验证。_pkg-config opencv --modversion-CSDN博客 Ubuntu下多版本OpenCV共存和切换_ubuntu20如同时安装o…...
突破内存限制:Mac Mini M2 服务器化实践指南
本篇文章,我们聊聊如何使用 Mac Mini M2 来实现比上篇文章性价比更高的内存服务器使用,分享背后的一些小的思考。 希望对有类似需求的你有帮助。 写在前面 在上文《ThinkPad Redis:构建亿级数据毫秒级查询的平民方案》中,我们…...
【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法
毕业设计排版时,一般要求每章节的起始页为奇数页,空白页不显示页眉和页脚。具体做法如下: 1 Word 在一个章节的内容完成后,在【布局】中,点击【分隔符】,然后选择【奇数页】 这样在下一章节开始的时&…...
【在Linux世界中追寻伟大的One Piece】多线程(二)
目录 1 -> 分离线程 2 -> Linux线程互斥 2.1 -> 进程线程间的互斥相关背景概念 2.2 -> 互斥量mutex 2.3 -> 互斥量的接口 2.4 -> 互斥量实现原理探究 3 -> 可重入VS线程安全 3.1 -> 概念 3.2 -> 常见的线程不安全的情况 3.3 -> 常见的…...
flink学习(8)——窗口函数
增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...
「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)
LightningChart .NET完全由GPU加速,并且性能经过优化,可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D,高级3D,Polar,Smith,3D饼/甜甜圈,地理地图和GIS图表以及适用于科…...
鸿蒙Native使用Demo
DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...
29.UE5蓝图的网络通讯,多人自定义事件,变量同步
3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器,即将房主作为…...
Scala—列表(可变ListBuffer、不可变List)用法详解
Scala集合概述-链接 大家可以点击上方链接,先对Scala的集合有一个整体的概念🤣🤣🤣 在 Scala 中,列表(List)分为不可变列表(List)和可变列表(ListBuffer&…...
【论文复现】偏标记学习+图像分类
📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...
C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术
C嘎嘎探索篇:栈与队列的交响:C中的结构艺术 前言: 小编在之前刚完成了C中栈和队列(stack和queue)的讲解,忘记的小伙伴可以去我上一篇文章看一眼的,今天小编将会带领大家吹奏栈和队列的交响&am…...
AIGC-----AIGC在虚拟现实中的应用前景
AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容(AIGC)的快速发展,虚拟现实(VR)技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性,这种组合不仅极大地降低了VR内容的…...
Django 路由层
1. 路由基础概念 URLconf (URL 配置):Django 的路由系统是基于 urls.py 文件定义的。路径匹配:通过模式匹配 URL,并将请求传递给对应的视图处理函数。命名路由:每个路由可以定义一个名称,用于反向解析。 2. 基本路由配…...
《硬件架构的艺术》笔记(八):消抖技术
简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号,这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。(就是每次断开闭合只对应一个操作)。 抖动在某些模拟和逻辑电路中可能产生问…...
Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系
一.什么是Spring?它解决了什么问题? 1.1什么是Spring? Spring,一般指代的是Spring Framework 它是一个开源的应用程序框架,提供了一个简易的开发方式,通过这种开发方式,将避免那些可能致使代码…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
虚拟电厂发展三大趋势:市场化、技术主导、车网互联
市场化:从政策驱动到多元盈利 政策全面赋能 2025年4月,国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》,首次明确虚拟电厂为“独立市场主体”,提出硬性目标:2027年全国调节能力≥2000万千瓦࿰…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
区块链技术概述
区块链技术是一种去中心化、分布式账本技术,通过密码学、共识机制和智能合约等核心组件,实现数据不可篡改、透明可追溯的系统。 一、核心技术 1. 去中心化 特点:数据存储在网络中的多个节点(计算机),而非…...
sshd代码修改banner
sshd服务连接之后会收到字符串: SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢? 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头,…...
Matlab实现任意伪彩色图像可视化显示
Matlab实现任意伪彩色图像可视化显示 1、灰度原始图像2、RGB彩色原始图像 在科研研究中,如何展示好看的实验结果图像非常重要!!! 1、灰度原始图像 灰度图像每个像素点只有一个数值,代表该点的亮度(或…...
TCP/IP 网络编程 | 服务端 客户端的封装
设计模式 文章目录 设计模式一、socket.h 接口(interface)二、socket.cpp 实现(implementation)三、server.cpp 使用封装(main 函数)四、client.cpp 使用封装(main 函数)五、退出方法…...
