SpringBoot 整合 RabbitMQ 实现流量消峰
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。
以前一直使用的是 ActiveMQ,在实际的生产使用中也出现了一些小问题,在网络查阅了很多的资料后,决定尝试使用 RabbitMQ 来替换 ActiveMQ,RabbitMQ 的高可用性、高性能、灵活性等一些特点吸引了我们,查阅了一些资料整理出此文。
RabbitMQ 介绍
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
相关概念
通常我们谈到队列服务, 会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
图片
-
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
-
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
-
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
-
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
-
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
-
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
-
Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去.
-
Topic:按规则转发消息(最灵活)
-
Headers:设置 header attribute 参数类型的交换机
-
Fanout:转发消息到所有绑定队列
Direct Exchange
Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
图片
第一个 X - Q1 就有一个 binding key,名字为 orange;X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗?- 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
-
路由键必须是一串字符,用句号( .) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
-
路由模式必须包含一个 星号(
*
),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:
-
*
表示一个词. -
#
表示零个或多个词.
Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。
简单使用
1、配置 Pom 包,主要是添加 spring-boot-starter-amqp 的支持
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring.application.name=Spring-boot-rabbitmqspring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
3、队列配置
@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}}
4、发送者
rabbitTemplate 是 Spring Boot 提供的默认实现
@component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}}
4、接收者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver : " + hello);}}
5、测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() throws Exception {helloSender.send();}}
注意,发送者和接收者的 queue name 必须一致,不然不能接收
多对多使用
一个发送者,N 个接收者或者 N 个发送者和 N 个接收者会出现什么情况呢?
一对多发送
对上面的代码进行了小改造,接收端注册了两个 Receiver,Receiver1 和 Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果
@Test
public void oneToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);}
}
结果如下:
Receiver 1: Spring boot test queue ****** 11
Receiver 2: Spring boot test queue ****** 12
Receiver 2: Spring boot test queue ****** 14
Receiver 1: Spring boot test queue ****** 13
Receiver 2: Spring boot test queue ****** 15
Receiver 1: Spring boot test queue ****** 16
Receiver 1: Spring boot test queue ****** 18
Receiver 2: Spring boot test queue ****** 17
Receiver 2: Spring boot test queue ****** 19
Receiver 1: Spring boot test queue ****** 20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test
public void manyToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);neoSender2.send(i);}
}
结果如下:
Receiver 1: Spring boot test queue ****** 20
Receiver 2: Spring boot test queue ****** 20
Receiver 1: Spring boot test queue ****** 21
Receiver 2: Spring boot test queue ****** 21
Receiver 1: Spring boot test queue ****** 22
Receiver 2: Spring boot test queue ****** 22
Receiver 1: Spring boot test queue ****** 23
Receiver 2: Spring boot test queue ****** 23
Receiver 1: Spring boot test queue ****** 24
Receiver 2: Spring boot test queue ****** 24
Receiver 1: Spring boot test queue ****** 25
Receiver 2: Spring boot test queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息
高级使用
对象的支持
Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {System.out.println("Sender object: " + user.toString());this.rabbitTemplate.convertAndSend("object", user);
}...//接收者
@RabbitHandler
public void process(User user) {System.out.println("Receiver object : " + user);
}
结果如下:
Sender object: User{name='test', pass='123456'}
Receiver object : User{name='test', pass='123456'}
Topic Exchange
topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列
首先对 topic 规则配置,这里使用两个队列来测试
@Configuration
public class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}
}
使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 "topic.message" 队列
public void send1() {String context = "hi, i am message 1";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);}public void send2() {String context = "hi, i am messages 2";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);}
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置
@Configuration
public class FanoutRabbitConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}
这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略:
public void send() {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到 fanout 交换机上面的队列都收到了消息。
相关文章:

SpringBoot 整合 RabbitMQ 实现流量消峰
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还…...

大数据挖掘建模平台案例分享
大数据挖掘建模平台是由泰迪自主研发,面向企业级用户的大数据挖掘建模平台。平台采用可视化操作方式,通过丰富内置算法,帮助用户快速、一站式地进行数据分析及挖掘建模,可应用于处理海量数据、高复杂性的数据挖掘任务,…...
MySQL数据表的管理
1.创建表 语法: create table 表名( 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】 ...... ); 注意:数据类型和约束,接下来用…...

SpringBoot【十三(实战篇)】集成在线接口文档Swagger2
一、前言🔥 环境说明:Windows10 Idea2021.3.2 Jdk1.8 SpringBoot 2.3.1.RELEASE 二、如何生成Swagger文档 上一期我们已经能正常访问swagger在线文档,但是文档空空如也,对不对,接下来我就教大家怎么把相关的接口都给…...

【C++初阶】第8课—标准模板库STL(string_2)
文章目录 1. string类对象遍历操作1.1 标准库中的成员函数begin( )和end( )1.2 标准库中的成员函数rbegin( )和rend( )1.3 C11引入的4个标准库中的成员函数 2. string类对象的访问2.1 operator[ ]运算符重载访问字符串字符2.2 公有成员函数at访问字符2.3 公有成员函数back()和f…...
【arm】程序跑飞,SWD端口不可用修复(N32G435CBL7)
项目场景: 国民N32G43X系列,烧录了一个测试程序,在DEBUG中不知什么原因挂掉,然后就无法连接SWD或JLINK。 问题描述 在SWD配置中不可见芯片型号,无法connect,无法烧录。但基本判断是芯片没有损坏。怀疑是程…...
https证书生成、linux 生成https证书、nginx 配置https证书
1. 检查 Certbot 是否已安装 which certbot 2. 安装 Certbot 2.1启用 EPEL 仓库(如果尚未启用): sudo yum install epel-release 2.2 安装 Certbot 和 Nginx 插件: sudo yum install certbot python3-certbot-nginx 2.3验证安…...
Halcon随机贴图生成缺陷图片脚本
halcon随机贴图生成缺陷图片,用于深度学习训练: read_image (Image, C:/Users/61082/Desktop/bentiiamge/omron/S06-1211/ok/ok_images/D246B_CPFNNUBA8LT0SX_AAA_S2412001793_C1216_1733895885320066.jpg) get_image_size (Image, Width, Height) gen_rectangle1 …...

[ZMQ] -- ZMQ通信Protobuf数据结构 1
1、前言背景 工作需要域间实现zmq通信,刚开始需要比较简单的数据结构,比如两个bool,后面可能就需要传输比较大的数据,所以记录下实现流程,至于为啥选择proto数据结构去做大数据传输,可能是地平线也用这个&…...

大数据平台
大数据行业应用持续升温,特别是企业级大数据市场正在进入快速发展时期。越来越多的企业期望实现数据孤岛的打通,整合海量的数据资源,挖掘并沉淀有价值的数据,进而驱动更智能的商业。随着公司数据爆发式增长,原有的数据…...
《C++解锁机器学习特征工程:构建智能数据基石》
在当今机器学习蓬勃发展的浪潮中,特征工程犹如一座坚实的基石,奠定了模型成功的基础。而 C以其卓越的性能和强大的底层控制能力,在实现机器学习特征工程方面发挥着独特且关键的作用。 特征工程的核心目标是从原始数据中提取和构建最具代表性…...

《机器学习》3.7-4.3end if 启发式 uci数据集klda方法——非线性可分的分类器
目录 uci数据集 klda方法——非线性可分的分类器 计算 步骤 1: 选择核函数 步骤 2: 计算核矩阵 步骤 4: 解广义特征值问题 と支持向量机(svm) 目标: 方法: 核技巧的应用: 区别: 使用 OvR MvM 将…...

【Linux】VMware 安装 Ubuntu18.04.2
ISO镜像安装步骤 选择语言 English 选择键盘布局 English 选择系统 Ubuntu 虚拟机网卡地址,默认即可 代理地址,默认空即可 镜像地址,修改成阿里云地址 选择第二项,LVM 磁盘扩容技术 第一块硬盘名sda,默认…...

人员离岗监测摄像机智能人员睡岗、逃岗监测 Python 语言结合 OpenCV
在安全生产领域,人员的在岗状态直接关系到生产流程的顺利进行和工作环境的安全稳定。人员离岗监测摄像机的出现,为智能人员睡岗、逃岗监测提供了高效精准的解决方案,而其中的核心技术如AI识别睡岗脱岗以及相关的算法盒子和常见的安全生产AI算…...
【Spark】Spark数据倾斜解决方案、大表join小表及大表join大表优化思路
如果觉得这篇文章对您有帮助,别忘了点赞、分享或关注哦!您的一点小小支持,不仅能帮助更多人找到有价值的内容,还能鼓励我持续分享更多精彩的技术文章。感谢您的支持,让我们一起在技术的世界中不断进步! Sp…...

探索 Cesium 的未来:3D Tiles Next 标准解析
探索 Cesium 的未来:3D Tiles Next 标准解析 随着地理信息系统(GIS)和 3D 空间数据的快速发展,Cesium 作为领先的开源 3D 地球可视化平台,已成为展示大规模三维数据和进行实时渲染的强大工具。近年来,随着…...

每日一站技術架構解析之-cc手機桌布網
# 網站技術架構解析: ## 一、整體架構概述https://tw.ccwallpaper.com是一個提供手機壁紙、桌布免費下載的網站,其技術架構設計旨在實現高效的圖片資源管理與用戶訪問體驗優化。 ### (一)前端展示 1. **HTML/CSS/JavaScript基礎構…...

prometheus监控之黑盒(blackbox)监控
1.简单介绍 blackbox-exporter项目地址:https://github.com/prometheus/blackbox_exporter blackbox-exporter是Prometheus官方提供的一个黑盒监控解决方案,blackbox-exporter无须安装在被监控的目标环境中,用户只需要将其安装在与Promethe…...

计算机网络之传输层协议TCP
个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 计算机网络之传输层协议TCP 收录于专栏【计算机网络】 本专栏旨在分享学习计算机网络的一点学习笔记,欢迎大家在评论区交流讨论💌 目…...

子查询与嵌套查询
title: 子查询与嵌套查询 date: 2024/12/13 updated: 2024/12/13 author: cmdragon excerpt: 子查询和嵌套查询是关系型数据库中强大的查询工具,允许用户在一个查询的结果中再进行查询。通过使用子查询,用户能够简化复杂的SQL语句,增强查询的灵活性和可读性。本节将探讨子…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...