RabbitMQ 介绍与 SpringBootAMQP使用
一、MQ概述
异步通信的优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂,业务么有明显的流程线,不方便追踪管理
什么是的MQ
MQ(Message Queue),消息队列,就是放消息的队列。也是事件驱动架构中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、 SMTP、STOMP | OpenWire、STOMP、 REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
二、RabbitMQ概述
1. RabbitMQ的结构和概念
- Channel:操作MQ的工具
- Exchange:路由消息到队列中
- Queue:缓存消息
- Virtual Host:虚拟主机,是对Queue、Exchange等资源的逻辑分组
2. 常见消息模型
-
基本消息队列(BasicQueue)
- Publisher:消息发布者,将消息发送到队列Queue
- Queue:消息队列,负责接受并缓存消息
- Consumer:订阅队列,处理队列中的消息
-
工作消息队列(WorkQueue)
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同进行消息处理,提高消费速度。
-
发布订阅(Publish、Subscribe),根据交换机类型不同分为三种:
-
Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
-
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
-
Consumer:消费者,与以前一样,订阅队列,没有变化
-
Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
-
Fanout Exchange: 广播
在广播模式下,消息发送流程:- 1) 可以有多个队列 - 2) 每个队列都要绑定到Exchange(交换机) - 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定 - 4) 交换机把消息发送给绑定过的所有队列 - 5) 订阅队列的消费者都能拿到消息
-
Direct Exchange:路由
在Fanout模式中,一条消息会被所有订阅的队列都消费。但是,在某些场景下,希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
-
Topic Exchange:主题
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符。
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
如下图:
- Queue1:绑定的是
china.#
,因此凡是以china.
开头的routing key
都会被匹配到。包括china.news和china.weather - Queue2:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配。包括china.news和japan.news
3. RabbitMQ的安装
1、安装Erlang:RabbitMQ是用Erlang编写的,因此首先需要安装Erlang运行环境(注意Erlang与RabbitMQ的对应版本)。运行以下命令进行安装:sudo apt install erlang
2、在线拉取镜像:docker pull rabbitmq:3-management
3、运行以下命令来下载并启动RabbitMQ Docker镜像:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
4、浏览器访问RabbitMQ管理页面:http://IP:15672/
(注意:若网页无法访问,可能是rabbitmq_management
插件未启用)
5、进入sbin目录下,查看插件,命令:rabbitmq-plugins list
6、 若 rabbitmq_management 插件未启用(状态无 * ),通过命令启用该插件:rabbitmq-plugins enable rabbitmq_management
7、启用后,重新访问地址,用户名/密码默认:guest/guest
三、SpringAMQP
AMQP,Adanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开发标准,与语言和平台无关。
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
1、使用SpringBootAMQP- SimpleQueue的步骤
- 引入AMQP的Starter依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ地址
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /listener:simple:prefetch: 3 # 每次只能获取一条消息,处理完成才能获取下一个消息
- 利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {//queueNameString queueName = "ty.simple.queue";//messageString message = "hello world ";//send MessagerabbitTemplate.convertAndSend(queueName, message);}}
2、使用SpringBootAMQP- FanoutExchange的步骤
- 创建Spring配置类,绑定交换机 - 队列
package com.example.rabbitmq_demo.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("ty.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定交换机与队列*/@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
- 利用RabbitTemplate的convertAndSend方法
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendFanoutExchange() {//exchangeNameString exchangeName = "ty.fanout";//messageString message = "hello world fanout";//send MessagerabbitTemplate.convertAndSend(exchangeName, "", message);}}
2、使用SpringBootAMQP- Direct的步骤
- 基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerDemo {/*** 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明* 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "ty.direct.queue1"),exchange = @Exchange(name = "ty.direct", type = ExchangeTypes.DIRECT),key = {"red", "green"}))public void listenDirectQueue1(String msg){System.out.println("listener ty.direct.queue1 Get message : " + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "ty.direct.queue2"),exchange = @Exchange(name = "ty.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue2(String msg){System.out.println("listener ty.direct.queue2 Get message : " + msg);}
}
- 通过convertAndSend发送消息,会根据的
RoutingKey
,将消息发送至指定队列。
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectExchange(){String exchangeName = "ty.direct";String message = "hello ty";rabbitTemplate.convertAndSend(exchangeName, "red", message);}
}
3、使用SpringBootAMQP- Tpic的步骤
- 基于注解来声明队列和交换机
package com.example.rabbitmq_demo.consumer;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerDemo {/*** Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "ty.topic.queue1"),exchange = @Exchange(name = "ty.topic", type = ExchangeTypes.TOPIC),key = "ty.#"))public void listenTopicQueue1(String msg){System.out.println("listener ty.topic.queue1 Get message : " + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "ty.topic.queue2"),exchange = @Exchange(name = "ty.topic", type = ExchangeTypes.TOPIC),key = "#.tyty"))public void listenTopicQueue2(String msg){System.out.println("listener ty.topic.queue2 Get message : " + msg);}
}
- 根据
RoutingKey
通配符,发送到对应Queue
package com.example.rabbitmq_demo;import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendTopicExchange(){String exchangeName = "ty.topic";String message = "hello ty";rabbitTemplate.convertAndSend(exchangeName, "ty.tyty", message);}}
4、SpringBootAMQP对象序列化
SpringBootAMQP默认使用的是 x-java-serialized-object
,JDK序列化数据体积过大、有安全漏洞,且可读性差。
可通过配置JSON转换器,使用Json的方式做序列化和反序列化。
- 引入jar
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 配置类中增加Bean
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
相关文章:

RabbitMQ 介绍与 SpringBootAMQP使用
一、MQ概述 异步通信的优点: 耦合度低吞吐量提升故障隔离流量削峰 异步通信的缺点: 依赖于Broker的可靠性、安全性、吞吐能力架构复杂,业务么有明显的流程线,不方便追踪管理 什么是的MQ MQ(Message Queue…...

企业门户的必备选择,WorkPlus的定制化解决方案
在当今数字化时代,企业门户成为了企业内外沟通与协作的重要基础设施。WorkPlus作为领先的品牌,为企业提供了一站式的企业门户解决方案,旨在提升企业形象、改善内外部沟通与协作效率。本文将深入探讨WorkPlus如何通过定制化的设计,…...

基于maven的项目搭建(已跑通)
1、直接选择archetype-webapp即可 (这里很多人会觉得很慢–解决方案:https://blog.csdn.net/qq_45591895/article/details/133705674?spm1001.2014.3001.5501) 2、手动添加一个java目录即可。 3、添加Tomcat 3、这就跑通了,可以…...

L1-035 情人节 c++解法
题目再现 以上是朋友圈中一奇葩贴:“2月14情人节了,我决定造福大家。第2个赞和第14个赞的,我介绍你俩认识…………咱三吃饭…你俩请…”。现给出此贴下点赞的朋友名单,请你找出那两位要请客的倒霉蛋。 输入格式: 输入…...

DecimalFormat 多语言、本地化指定Locale
DecimalFormat再未指定Locale会使用默认的Locale,不同的Locale会导致格式化时出现出乎预期的现象。如Locale为西班牙时,小数点符号为",“千位分隔符为”."。 所以在多语言或者需要本地化的情况下,使用DecimalFormat最好指定Locale避…...

冲刺第十五届蓝桥杯P0003倍数问题
文章目录 原题连接解析代码 原题连接 倍数问题 解析 需要找出三个数字,三个数字之和是k的倍数,并且这个数字需要最大,很容易想到的就是将数组进行倒叙排序,然后三层for循环解决问题,但是这样会导致**时间复杂度很高…...

操作系统备考学习 day7 (2.3.4 ~ 2.3.5)
操作系统备考学习 day7 第二章 进程与线程2.3 同步与互斥2.3.4 信号量 用信号量实现进程互斥、同步、前驱关系信号量机制实现进程互斥信号量机制实现进程同步信号量机制实现前驱关系 2.3.5 经典同步问题生产者-消费者问题多生产者和多消费者模型抽烟者问题读者-写者问题哲学家进…...

HRM人力资源管理系统源码
HRM人力资源管理系统源码 运行环境:PHP8.1或以上 MYSQL5.7或以上 php扩展要求 fileinfo imagemagick 功能介绍: 综合仪表板 它通过其综合仪表板提供了员工总数、工单和帐户余额的概览。 您可以轻松访问组织中的缺席者以及详细的公告和预定会议列…...

基于SSM的旅游网站设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…...

大厂秋招真题【BFS+DP】华为20230921秋招T3-PCB印刷电路板布线(留学生专场)
华为20230921秋招T3-PCB印刷电路板布线(留学生专场) 题目描述与示例 题目描述 在PCB印刷电路板设计中,器件之间的连线,要避免线路的阻抗值增大,而且器件之间还有别的器任和别的干扰源,在布线时我们希望受…...

OpenCV Python – 使用SIFT算法实现两张图片的特征匹配
OpenCV Python – 使用SIFT算法实现两张图片的特征匹配 1.要实现在大图中找到任意旋转、缩放等情况下的小图位置,可以使用特征匹配算法,如 SIFT (尺度不变特征变换) 或 SURF (加速稳健特征)。这些算法可以在不同尺度和旋转情况下寻找匹配的特征点 impo…...

doc转html后添加style和导航
public static void main(String[] args) throws Exception {docxToHtml(); } public static void docxToHtml() throws Exception {//D:\zpdtolly\工作总结文档\zpd使用文档\v4\用户使用手册\客户端使用手册String sourceFileName "C:\\Users\\luoguoqing\\Desktop\\202…...

Python中跨越多个文件使用全局变量
嗨喽,大家好呀~这里是爱看美女的茜茜呐 这个琐碎的指南是关于在 Python 中跨多个文件使用全局变量。 但是在进入主题之前,让我们简单地看看全局变量和它们在多个文件中的用途。 👇 👇 👇 更多精彩机密、教程ÿ…...

设计模式 - 解释器模式
目录 一. 前言 二. 实现 三. 优缺点 一. 前言 解释器模式(Interpreter Pattern)指给定一门语言,定义它的文法的一种表示,并定义一个解释器,该解释器使用该表示来解释语言中的句子,属于行为型设计模式。是…...

javascript禁止鼠标右键和复制功能
要禁止鼠标右键和复制功能,可以编写如下的封装函数: function preventDefaultCopy(event) {// 禁止右键 菜单和复制event.preventDefault();event.stopPropagation();return false; }// 在需要禁止复制的元素上添加该事件监听器 element.addEventListen…...

WebDAV之π-Disk派盘 + 咕咚云图
咕咚云图是一款强大的图床传图软件,它能够让您高效地对手机中的各种图片进行github传输,多个平台快速编码上传,支持远程删除不需要的图片,传输过程安全稳定,让您可以很好的进行玩机或者其他操作。 可帮你上传手机图片到图床上,并生成 markdown 链接,支持七牛云、阿里云…...

C语言-数组
C 语言支持数组数据结构,数组是一个由若干相同类型变量组成的有序集合。 这里的有序是指数组元素在内存中的存放方式是有序的,即所有的数组都是由连续的内存位置组成。最低的地址对应第一个元素,最高的地址对应最后一个元素。 在 C 语言中&am…...

Linux UWB Stack实现——MCPS调度接口(API)
在上一篇文章中,介绍了MCPS调度接口涉及的相关数据结构实现MCPS调度接口(数据结构),本文继续介绍调度相关的方法的实现。 1. 域处理 1.1 域注册与注销 注册/注销一个mcps802154_region,分别在模块加载(mo…...

el-tree中插入图标并且带提示信息
<template><div class"left"><!-- default-expanded-keys 默认展开 --><!-- expand-on-click-node 只有点击箭头才会展开树 --><el-tree :data"list" :props"defaultProps" node-click"handleNodeClick" :…...

竞赛选题 深度学习 YOLO 实现车牌识别算法
文章目录 0 前言1 课题介绍2 算法简介2.1网络架构 3 数据准备4 模型训练5 实现效果5.1 图片识别效果5.2视频识别效果 6 部分关键代码7 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 基于yolov5的深度学习车牌识别系统实现 该项目较…...

Direct3D网格(一)
创建网格 我们可以用D3DXCreateMeshFVF函数创建一个"空"网格对象 ,空网格对象是指我们指定了网格的面片总数和顶点总数,然后由该函数为顶点缓存、索引缓存和属性缓存分配大小合适的内存,之后即可手工填入网格数据。 HRESULT WINA…...

C语言打印菱形
一、运行结果图 二、源代码 # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>int main() {//初始化变量值;int line 0;int i 0;int j 0;//获取变量值;scanf("%d", &line);//循环打印上半部分;for (i 0; i <…...

ElasticSearch搜索引擎:数据的写入流程
一、ElasticSearch 写数据的总体流程: (1)ES 客户端选择一个节点 node 发送请求过去,这个节点就是协调节点 coordinating node (2)协调节点对 document 进行路由,通过 hash 算法计算出数据应该…...

python3 调用 另外一个python脚本
3种python调用其他脚本脚本的方法_python 调用python脚本_linjingyg的博客-CSDN博客 Python之系统交互(调用系统命令)subprocess_subprocess.getoutput(cmd) 参数格式不正确-CSDN博客 subprocess.call()只能返回状态码。subprocess.getoutput(cmd)只能输出命令结果。 str(py…...

【13】c++设计模式——>简单工厂模式
工厂模式的定义 c中的工厂模式是一种创建型设计模式,它提供一种创建对象的接口,但具体创建的对象类型可以在运行时决定,这样可以将对象的创建与使用代码分离,提高代码的灵活性和可维护性。 在c中实现工厂模式,通常会定…...

系统架构设计:2 论软件设计方法及其应用
目录 一 软件设计方法 1结构化设计 2信息工程 3面向对象设计 4原型设计...

基于Winform的UDP通信
1、文件结构 2、UdpReceiver.cs using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading.Tasks;namespace UDPTest.Udp {public class UdpStateEventArgs : EventArgs…...

掌握 BERT:自然语言处理 (NLP) 从初级到高级的综合指南(1)
简介 BERT(来自 Transformers 的双向编码器表示)是 Google 开发的革命性自然语言处理 (NLP) 模型。它改变了语言理解任务的格局,使机器能够理解语言的上下文和细微差别。在本文[1]中,我们将带您踏上从 BERT 基础知识到高级概念的旅…...

Linux Ftrace介绍
文章目录 一、简介二、内核函数调用跟踪参考链接: 一、简介 Ftrace 是 Linux 官方提供的跟踪工具,在 Linux 2.6.27 版本中引入。Ftrace 可在不引入任何前端工具的情况下使用,让其可以适合在任何系统环境中使用。 Ftrace 可用来快速排查以下相…...

Go语言进阶------>init()函数
Init()包初始化 执行优先级 Init()函数的执行优先级比main()函数的执行优先级要高,也就是说程序会优先执行Init()函数之后再执行main()函数. 代码如下 package mainimport "fmt"func init() {fmt.Println("执行了Init()函数") }func main() {fmt.Println…...