当前位置: 首页 > news >正文

RabbitMQ介绍与使用

RabbitMQ官网

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中,用于实现应用程序之间的异步消息传递。RabbitMQ 具有高可靠性、易扩展、高可用和功能丰富的特点,支持多种编程语言客户端,如 Java、Python、Ruby、C# 等。

RabbitMQ 的核心概念

  • Producer(生产者):消息的生产者,负责将消息发送到 RabbitMQ 中的 Exchange。
  • Consumer(消费者):消息的消费者,负责从队列中获取并处理消息。
  • Connection:生产者/消费者和 Broker 之间的 TCP 连接。
  • Channel:在 Connection 内部建立的逻辑连接,用于减少操作系统建立 TCP 连接的开销。
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
  • Virtual Host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。
  • Exchange:消息到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的类型有 direct、topic 和 fanout。
  • Queue:消息最终被送到这里等待消费者取走。

RabbitMq的交换机类型

1. Direct Exchange

  • 描述:Direct 交换机是最简单的交换机类型。它根据消息的 routing key 将消息路由到一个特定的队列。如果队列的 binding key 与消息的 routing key 完全匹配,则消息会被路由到该队列。
  • 特点
    • 一对一匹配:消息的 routing key 必须与队列的 binding key 完全相同。
    • 简单直接:适用于一对一的消息传递场景。
  • 示例
    • 生产者发送消息时指定 routing key 为 info
    • 队列 A 绑定到交换机时,binding key 也为 info
    • 消息将被路由到队列 A。

2. Topic Exchange

  • 描述:Topic 交换机允许更复杂的路由模式。消息的 routing key 和队列的 binding key 可以包含通配符,从而实现更灵活的路由规则。
  • 特点
    • 模式匹配:支持通配符 *(匹配一个单词)和 #(匹配多个单词)。
    • 灵活多变:适用于多对多的消息传递场景,可以实现复杂的路由逻辑。
  • 示例
    • 生产者发送消息时指定 routing key 为 user.info
    • 队列 A 绑定到交换机时,binding key 为 user.*
    • 队列 B 绑定到交换机时,binding key 为 *.info
    • 消息将被路由到队列 A 和队列 B。

3. Fanout Exchange

  • 描述:Fanout 交换机是最简单的广播交换机。它不关心消息的 routing key,将消息广播到所有绑定到该交换机的队列。
  • 特点
    • 广播消息:消息会被发送到所有绑定的队列,无论队列的 binding key 是什么。
    • 简单高效:适用于需要将消息广播到多个消费者的情况。
  • 示例
    • 生产者发送消息时,不指定 routing key。
    • 队列 A、队列 B 和队列 C 都绑定到该交换机。
    • 消息将被路由到队列 A、队列 B 和队列 C。

RabbitMQ 的主要特点

  • 可靠性:使用消息确认机制,确保消息的可靠传递。生产者在发送消息后会收到一个确认,消费者在处理完消息后会发送一个确认。如果消息发送或处理失败,RabbitMQ 会重新发送消息,直到确认为止。
  • 灵活性:支持多种消息传递模式,包括点对点、发布/订阅和消息路由等。
  • 可扩展性:可以通过添加更多的节点来实现水平扩展,以处理更大的消息负载。它还支持集群和镜像队列,提供高可用性和负载均衡。
  • 多语言支持:提供了多种编程语言的客户端库,包括 Java、Python、Ruby、C# 等。

MQ选型对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
单机吞吐量万级万级十万级十万级以上
消息延迟微秒级毫秒级毫秒级毫秒级以内
消息可靠性较高,基本不丢较低,有丢大概率经过参数优化配置,可以做到 0 丢失经过参数优化配置,可以做到 0 丢失

RabbitMQ安装

环境:Centos7.9,基于docker安装

1.使用docker run命令创建容器并安装mq

docker run \-e RABBITMQ_DEFAULT_USER=mqadmin \-e RABBITMQ_DEFAULT_PASS=mqadmin \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mq-net\-d \rabbitmq:3.8-management

2.开放端口,或关闭防火墙(如果访问不了)

方法1:开放端口

#1.开放mq端口
firewall-cmd --zone=public --add-port=15672/tcp --add-port=5672/tcp --permanent
#2.重新加载防火墙配置
firewall-cmd --reload

方法2:临时关闭防火墙

systemctl stop firewalld

3.访问RabbitMQ控制台并登录

账号密码就是创建mq容器时指定的RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS

访问IP地址:主机ip:15672

 

RabbitMQ控制台使用

1.收发消息

1.1创建消息队列

 1.2创建一个交换机

 1.3讲交换机与队列绑定

 1.4发送消息

1.5查看消息

2.数据隔离

当我们只部署了一个mq的话,当多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。

实现步骤:

1.在我们的用户管理创建一个新用户

可以看到我们的用户创建成功,但是刚创建的用户是没有虚拟主机的

2.登录新创建的用户,配置虚拟主机

我们可以通过右上角选择自己的虚拟主机

 可以看到,在我们选择我们当前用户的虚拟机主机之后,就看不到我们之前用/创建的队列了

SpringAMQP使用

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

Spring AMQPSpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

1.创建一个Maven的mqDemo项目

2.创建两个子模块publisher(消息的发送者)、consumer(消息的消费者)

3.在父模块的pom.xml中导入以下配置:

    <groupId>cn.mq.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies>

4.在子模块pom.xml中分别导入以下配置:

publisher

    <parent><artifactId>mq-demo</artifactId><groupId>cn.mq.demo</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>publisher</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties>

consumer

    <parent><artifactId>mq-demo</artifactId><groupId>cn.mq.demo</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>consumer</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties>

5.在两个子模块的application.yaml中加入以下配置:

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.181.32 # 你的虚拟机IPport: 5672 # 端口virtual-host: /test # 虚拟主机username: testuser # 用户名password: testuser # 密码

6.创建交换机、队列,并监听消息

方式1基于配置类创建交换机、队列并绑定:

在consumer下创建一个configuration类

@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange() {//创建交换机return new FanoutExchange("test.fanout");}@Beanpublic Queue fanoutQueue1() {//创建队列return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2() {//创建队列return new Queue("fanout.queue2");}@Beanpublic Binding binDingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding binDingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {//将队列与交换机进行绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

然后创建一个监听类,监听消息

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")//监听的队列public void listenerFanoutQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());}@RabbitListener(queues = "fanout.queue2")//监听的队列public void listenerFanoutQueue2(String message) throws InterruptedException {System.out.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());}
}

方式2基于注解创建交换机、队列并绑定:

@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.quque1"),exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)))public void listenerFanoutQueue1(String message) throws InterruptedException {System.err.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue1"),exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)))public void listenerFanoutQueue2(String message) throws InterruptedException {System.err.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());}
}

 启动ConsumerApplication类,查看rabbitmq控制台查看是否已经创建交换机和队列成功,并且正确绑定(上面的方式实现一种即可)

 6.发送消息

在publisher创建测试类发送消息

@Slf4j
@SpringBootTest
class SpringAmqpTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void testFanoutQueue() {String exchangeName = "test.fanout";//交换机名称String message = "hello,everyone!";//发送的消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
}

8.测试

1.运行ConsumerApplication启动类,保持运行状态

2.运行testFanoutQueue测试类的方法

3.查看控制台输出

正确接收到消息! 

相关文章:

RabbitMQ介绍与使用

RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;基于 AMQP&#xff08;高级消息队列协议&#xff09;标准&#xff0c;使用 Erlang 编程语言构建。它是消息队列&#xff08;MQ&#xff09;的一种&#xff0c;广泛应用于分布式系统中&#x…...

从0到机器视觉工程师(六):配置OpenCV和Qt环境

CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...

计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应

LORA-FA&#xff1a;针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title&#xff1a;LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...

huggingface/bert/transformer的模型默认下载路径以及自定义路径

当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时&#xff0c;Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件&#xff08;如果它们不在本地缓存中&#xff09;。 默认情况下&#xff0c;这些…...

从 0 开始上手 Solana 智能合约

Solana CLI 基础知识 Solana CLI 是一个命令行界面工具&#xff0c;提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令&#xff0c;但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...

(六)CAN总线通讯

文章目录 CAN总线回环测试第一种基于板载CAN测试第一步确认板载是否支持第二步关闭 CAN 接口将 CAN 接口置于非活动状态第三步 配置 CAN 接口第一步 设置 CAN 接口比特率第二步 设置 CAN 启用回环模式第三步 启用 CAN 接口 第四步 测试CAN总线回环捕获 CAN 消息发送 CAN 消息 第…...

新一代智能工控系统网络安全合规解决方案

01.新一代智能工控系统概述 新一代智能工控系统是工业自动化的核心&#xff0c;它通过集成人工智能、工业大模型、物联网、5G等技术&#xff0c;实现生产过程的智能化管理和控制。这些系统具备实时监控、自动化优化、灵活调整等特点&#xff0c;能够提升生产效率、保证产品质量…...

Vivado中Tri_mode_ethernet_mac的时序约束、分析、调整——(一)时序约束的基本概念

1、基本概念 推荐阅读&#xff0c;Ally Zhou编写的《Vivado使用误区与进阶》系列文章&#xff0c;熟悉基本概念、tcl语句的使用。 《Vivado使用误区与进阶》电子书开放下载&#xff01;&#xff01; 2、Vivado中的语法例程 1&#xff09;语法例程 约束的语句可以参考vivado…...

车载网络:现代汽车的数字心跳

在汽车领域&#xff0c;“智能汽车”一词毫不夸张。如今的汽车已不再是原始的机械工程&#xff0c;而是通过先进的车载网络无缝连接的精密数字生态系统。这些滚动计算机由复杂的电子控制单元(ECU)网络提供动力&#xff0c;ECU是负责管理从发动机性能到信息娱乐系统等一切事务的…...

python基础和redis

1. Map函数 2. filter函数 numbers generate_numbers() filtered_numbers filter(lambda x: x % 2 0, numbers) for _ in range(5):print(next(filtered_numbers)) # 输出: 0 2 4 6 83. filter map 和 reduce 4. picking and unpicking 5. python 没有函数的重载&#xff0…...

w~自动驾驶~合集16

我自己的原文哦~ https://blog.51cto.com/whaosoft/12765612 #SIMPL 用于自动驾驶的简单高效的多智能体运动预测基准 原标题&#xff1a;SIMPL: A Simple and Efficient Multi-agent Motion Prediction Baseline for Autonomous Driving 论文链接&#xff1a;https://ar…...

最长的指定瑕疵度的元音子串

一、题目 最长的指定瑕疵度的元音子串 定义&#xff1a;开头和结尾都是元音字母&#xff08;aeiouAEIOU&#xff09;的字符串为 元音字符串 &#xff0c;其中混杂的非元音字母数量为其 瑕疵度 。比如: “a” 、 "aa"是元音字符串&#xff0c;其瑕疵度都为0 "aiu…...

每日算法Day15【组合、组合总和III、电话号码的字母组合】

77. 组合 算法链接: 77. 组合 - 力扣&#xff08;LeetCode&#xff09; 类型: 回溯 难度: 中等 回溯三步法&#xff1a; 1、确定参数返回值 2、确定终止条件 3、单层搜索逻辑 剪枝操作&#xff1a; 当path容量超过k时的数据可以不用遍历&#xff0c;故遍历边界条件判断: …...

C语言教程——指针进阶(2)

目录 一、函数指针数组 1.1函数指针数组写法 1.2函数指针用途 二、指向函数指针数组的指针 2.1概念 三、回调函数 3.1用法 3.2qsort排序 总结 前言 我们接着上一篇的函数指针往下学习。 一、函数指针数组 1.1函数指针数组写法 我们都知道指针数组&#xff0c;里面可以…...

调和级数不为整数的证明

文章目录 1. 问题引入2. 证明2.1 引理12.2 引理22.3 引理3&#xff1a;2.4 核心证明&#xff1a; 3. 参考 1. 问题引入 s ( n ) 1 1 2 1 3 ⋯ 1 n , n ∈ N ∗ , n ≥ 2 s(n) 1\frac{1}{2}\frac{1}{3}\cdots\frac{1}{n}, \quad \\n \in N^*, n \ge2 s(n)121​31​⋯n1​,…...

基于微信小程序的在线学习系统springboot+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的&#xff0c;在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值&#xff0c;吸引更多的访问者访问系统&#xff0c;以及让来访用户可以花费更多时间停留在系统上&#xff0c;则表明该系统设计得比较专…...

基于 Boost.Asio 和 Boost.Beast 的异步 HTTP 服务器(学习记录)

已完成功能&#xff1a; 支持 GET 和 POST 请求的路由与回调处理。 解析URL请求。 单例模式 管理核心业务逻辑。 异步 I/O 技术和 定时器 控制超时。 通过回调函数注册机制&#xff0c;可以灵活地为不同的 URL 路由注册处理函数。 1. 项目背景 1.1 项目简介 本项目是一个基于…...

有机物谱图信息的速查技巧有哪些?

谱图信息是化学家解读分子世界的“语言”&#xff0c;它们在化学研究的各个领域都发挥着不可或缺的作用。它们是理解和确定分子结构的关键&#xff0c;对化学家来说极为重要&#xff0c;每一种谱学技术都提供了不同的视角来观察分子&#xff0c;从而揭示其独特的化学和物理特性…...

Eureka缓存机制

一、Eureka的CAP特性 Eureka是一个AP系统&#xff0c;它优先保证可用性&#xff08;A&#xff09;和分区容错性&#xff08;P&#xff09;&#xff0c;而不保证强一致性&#xff08;C&#xff09;。这种设计使得Eureka在分布式系统中能够应对各种故障和分区情况&#xff0c;保…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...

yaml读取写入常见错误 (‘cannot represent an object‘, 117)

错误一&#xff1a;yaml.representer.RepresenterError: (‘cannot represent an object’, 117) 出现这个问题一直没找到原因&#xff0c;后面把yaml.safe_dump直接替换成yaml.dump&#xff0c;确实能保存&#xff0c;但出现乱码&#xff1a; 放弃yaml.dump&#xff0c;又切…...

【实施指南】Android客户端HTTPS双向认证实施指南

&#x1f510; 一、所需准备材料 证书文件&#xff08;6类核心文件&#xff09; 类型 格式 作用 Android端要求 CA根证书 .crt/.pem 验证服务器/客户端证书合法性 需预置到Android信任库 服务器证书 .crt 服务器身份证明 客户端需持有以验证服务器 客户端证书 .crt 客户端身份…...

对象回调初步研究

_OBJECT_TYPE结构分析 在介绍什么是对象回调前&#xff0c;首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例&#xff0c;用_OBJECT_TYPE这个结构来解析它&#xff0c;0x80处就是今天要介绍的回调链表&#xff0c;但是先不着急&#xff0c;先把目光…...