SpringBoot 整合 RabbitMQ 实现流量消峰
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。
以前一直使用的是 ActiveMQ,在实际的生产使用中也出现了一些小问题,在网络查阅了很多的资料后,决定尝试使用 RabbitMQ 来替换 ActiveMQ,RabbitMQ 的高可用性、高性能、灵活性等一些特点吸引了我们,查阅了一些资料整理出此文。
RabbitMQ 介绍
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
相关概念
通常我们谈到队列服务, 会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

图片
-
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
-
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
-
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
-
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
-
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
-
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
-
Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去.
-
Topic:按规则转发消息(最灵活)
-
Headers:设置 header attribute 参数类型的交换机
-
Fanout:转发消息到所有绑定队列
Direct Exchange
Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

图片
第一个 X - Q1 就有一个 binding key,名字为 orange;X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗?- 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
-
路由键必须是一串字符,用句号( .) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
-
路由模式必须包含一个 星号(
*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:
-
*表示一个词. -
#表示零个或多个词.
Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。
简单使用
1、配置 Pom 包,主要是添加 spring-boot-starter-amqp 的支持
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring.application.name=Spring-boot-rabbitmqspring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
3、队列配置
@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}}
4、发送者
rabbitTemplate 是 Spring Boot 提供的默认实现
@component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}}
4、接收者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver : " + hello);}}
5、测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() throws Exception {helloSender.send();}}
注意,发送者和接收者的 queue name 必须一致,不然不能接收
多对多使用
一个发送者,N 个接收者或者 N 个发送者和 N 个接收者会出现什么情况呢?
一对多发送
对上面的代码进行了小改造,接收端注册了两个 Receiver,Receiver1 和 Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果
@Test
public void oneToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);}
}
结果如下:
Receiver 1: Spring boot test queue ****** 11
Receiver 2: Spring boot test queue ****** 12
Receiver 2: Spring boot test queue ****** 14
Receiver 1: Spring boot test queue ****** 13
Receiver 2: Spring boot test queue ****** 15
Receiver 1: Spring boot test queue ****** 16
Receiver 1: Spring boot test queue ****** 18
Receiver 2: Spring boot test queue ****** 17
Receiver 2: Spring boot test queue ****** 19
Receiver 1: Spring boot test queue ****** 20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test
public void manyToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);neoSender2.send(i);}
}
结果如下:
Receiver 1: Spring boot test queue ****** 20
Receiver 2: Spring boot test queue ****** 20
Receiver 1: Spring boot test queue ****** 21
Receiver 2: Spring boot test queue ****** 21
Receiver 1: Spring boot test queue ****** 22
Receiver 2: Spring boot test queue ****** 22
Receiver 1: Spring boot test queue ****** 23
Receiver 2: Spring boot test queue ****** 23
Receiver 1: Spring boot test queue ****** 24
Receiver 2: Spring boot test queue ****** 24
Receiver 1: Spring boot test queue ****** 25
Receiver 2: Spring boot test queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息
高级使用
对象的支持
Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {System.out.println("Sender object: " + user.toString());this.rabbitTemplate.convertAndSend("object", user);
}...//接收者
@RabbitHandler
public void process(User user) {System.out.println("Receiver object : " + user);
}
结果如下:
Sender object: User{name='test', pass='123456'}
Receiver object : User{name='test', pass='123456'}
Topic Exchange
topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列
首先对 topic 规则配置,这里使用两个队列来测试
@Configuration
public class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}
}
使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 "topic.message" 队列
public void send1() {String context = "hi, i am message 1";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);}public void send2() {String context = "hi, i am messages 2";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);}
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置
@Configuration
public class FanoutRabbitConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}
这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略:
public void send() {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到 fanout 交换机上面的队列都收到了消息。
相关文章:
SpringBoot 整合 RabbitMQ 实现流量消峰
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还…...
大数据挖掘建模平台案例分享
大数据挖掘建模平台是由泰迪自主研发,面向企业级用户的大数据挖掘建模平台。平台采用可视化操作方式,通过丰富内置算法,帮助用户快速、一站式地进行数据分析及挖掘建模,可应用于处理海量数据、高复杂性的数据挖掘任务,…...
MySQL数据表的管理
1.创建表 语法: create table 表名( 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】 ...... ); 注意:数据类型和约束,接下来用…...
SpringBoot【十三(实战篇)】集成在线接口文档Swagger2
一、前言🔥 环境说明:Windows10 Idea2021.3.2 Jdk1.8 SpringBoot 2.3.1.RELEASE 二、如何生成Swagger文档 上一期我们已经能正常访问swagger在线文档,但是文档空空如也,对不对,接下来我就教大家怎么把相关的接口都给…...
【C++初阶】第8课—标准模板库STL(string_2)
文章目录 1. string类对象遍历操作1.1 标准库中的成员函数begin( )和end( )1.2 标准库中的成员函数rbegin( )和rend( )1.3 C11引入的4个标准库中的成员函数 2. string类对象的访问2.1 operator[ ]运算符重载访问字符串字符2.2 公有成员函数at访问字符2.3 公有成员函数back()和f…...
【arm】程序跑飞,SWD端口不可用修复(N32G435CBL7)
项目场景: 国民N32G43X系列,烧录了一个测试程序,在DEBUG中不知什么原因挂掉,然后就无法连接SWD或JLINK。 问题描述 在SWD配置中不可见芯片型号,无法connect,无法烧录。但基本判断是芯片没有损坏。怀疑是程…...
https证书生成、linux 生成https证书、nginx 配置https证书
1. 检查 Certbot 是否已安装 which certbot 2. 安装 Certbot 2.1启用 EPEL 仓库(如果尚未启用): sudo yum install epel-release 2.2 安装 Certbot 和 Nginx 插件: sudo yum install certbot python3-certbot-nginx 2.3验证安…...
Halcon随机贴图生成缺陷图片脚本
halcon随机贴图生成缺陷图片,用于深度学习训练: read_image (Image, C:/Users/61082/Desktop/bentiiamge/omron/S06-1211/ok/ok_images/D246B_CPFNNUBA8LT0SX_AAA_S2412001793_C1216_1733895885320066.jpg) get_image_size (Image, Width, Height) gen_rectangle1 …...
[ZMQ] -- ZMQ通信Protobuf数据结构 1
1、前言背景 工作需要域间实现zmq通信,刚开始需要比较简单的数据结构,比如两个bool,后面可能就需要传输比较大的数据,所以记录下实现流程,至于为啥选择proto数据结构去做大数据传输,可能是地平线也用这个&…...
大数据平台
大数据行业应用持续升温,特别是企业级大数据市场正在进入快速发展时期。越来越多的企业期望实现数据孤岛的打通,整合海量的数据资源,挖掘并沉淀有价值的数据,进而驱动更智能的商业。随着公司数据爆发式增长,原有的数据…...
《C++解锁机器学习特征工程:构建智能数据基石》
在当今机器学习蓬勃发展的浪潮中,特征工程犹如一座坚实的基石,奠定了模型成功的基础。而 C以其卓越的性能和强大的底层控制能力,在实现机器学习特征工程方面发挥着独特且关键的作用。 特征工程的核心目标是从原始数据中提取和构建最具代表性…...
《机器学习》3.7-4.3end if 启发式 uci数据集klda方法——非线性可分的分类器
目录 uci数据集 klda方法——非线性可分的分类器 计算 步骤 1: 选择核函数 步骤 2: 计算核矩阵 步骤 4: 解广义特征值问题 と支持向量机(svm) 目标: 方法: 核技巧的应用: 区别: 使用 OvR MvM 将…...
【Linux】VMware 安装 Ubuntu18.04.2
ISO镜像安装步骤 选择语言 English 选择键盘布局 English 选择系统 Ubuntu 虚拟机网卡地址,默认即可 代理地址,默认空即可 镜像地址,修改成阿里云地址 选择第二项,LVM 磁盘扩容技术 第一块硬盘名sda,默认…...
人员离岗监测摄像机智能人员睡岗、逃岗监测 Python 语言结合 OpenCV
在安全生产领域,人员的在岗状态直接关系到生产流程的顺利进行和工作环境的安全稳定。人员离岗监测摄像机的出现,为智能人员睡岗、逃岗监测提供了高效精准的解决方案,而其中的核心技术如AI识别睡岗脱岗以及相关的算法盒子和常见的安全生产AI算…...
【Spark】Spark数据倾斜解决方案、大表join小表及大表join大表优化思路
如果觉得这篇文章对您有帮助,别忘了点赞、分享或关注哦!您的一点小小支持,不仅能帮助更多人找到有价值的内容,还能鼓励我持续分享更多精彩的技术文章。感谢您的支持,让我们一起在技术的世界中不断进步! Sp…...
探索 Cesium 的未来:3D Tiles Next 标准解析
探索 Cesium 的未来:3D Tiles Next 标准解析 随着地理信息系统(GIS)和 3D 空间数据的快速发展,Cesium 作为领先的开源 3D 地球可视化平台,已成为展示大规模三维数据和进行实时渲染的强大工具。近年来,随着…...
每日一站技術架構解析之-cc手機桌布網
# 網站技術架構解析: ## 一、整體架構概述https://tw.ccwallpaper.com是一個提供手機壁紙、桌布免費下載的網站,其技術架構設計旨在實現高效的圖片資源管理與用戶訪問體驗優化。 ### (一)前端展示 1. **HTML/CSS/JavaScript基礎構…...
prometheus监控之黑盒(blackbox)监控
1.简单介绍 blackbox-exporter项目地址:https://github.com/prometheus/blackbox_exporter blackbox-exporter是Prometheus官方提供的一个黑盒监控解决方案,blackbox-exporter无须安装在被监控的目标环境中,用户只需要将其安装在与Promethe…...
计算机网络之传输层协议TCP
个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 计算机网络之传输层协议TCP 收录于专栏【计算机网络】 本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌 目…...
子查询与嵌套查询
title: 子查询与嵌套查询 date: 2024/12/13 updated: 2024/12/13 author: cmdragon excerpt: 子查询和嵌套查询是关系型数据库中强大的查询工具,允许用户在一个查询的结果中再进行查询。通过使用子查询,用户能够简化复杂的SQL语句,增强查询的灵活性和可读性。本节将探讨子…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
