当前位置: 首页 > news >正文

RabbitMQ相关概念介绍

        这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很容易理解。对于消息队列的高手,这篇文章如有阐述不到位的,可以一起交流。

        RabbitMQ整体上是一个生产者和消费者模型,如下图。

生产者和消费者

        RabbitMQ的Producer和其他消息队列的Producer没有什么不同,都是用来将消息发送到服务器,只是在实现上有所区别,关于RabbitMQ客户端的实现,包括API和网络模型等,后面会专门有文章介绍。

        RabbitMQ Consumer连接Broker,并订阅它关注的队列,只要队列上有消息,Consumer就会接收到消息并开始消费消息。RabbitMQ的消费端默认是推模型。

        有Producer、Consumer,那么就会有一个地方来存储、转发消息,RabbitMQ Broker完成这项工作,在这里,可以先简单的把一个Broker理解为一个RabbitMQ 节点或实例。

队列

        Queue是RabbitMQ的内部对象,是实际存储Producer发送的消息的地方,这点和Kafka存储消息的模型不一样。

        Producer发送消息并不是直接发送到Queue,而是在发送消息的请求中声明Exchange(交换器) 和 RoutingKey(绑定键),Broker会根据Exchange 和 RoutingKey找到相应的Queue,并保存消息内容到Queue。

        Consumer订阅的是Queue,所有直接从Queue上消费消息。RabbitMQ支持多个Consumer同时订阅一个Queue,这时Broker会轮询Consumer,把Queue中的消息均摊到所有订阅次Queue的Consumer。但是RabbitMQ不支持队列层面的广播消费。

交换器、路由键、绑定

        下面介绍的是RabbitMQ中非常重要的概念,生产和消费消息都是以此为基础,也是对AMQP协议的具体实现。

        交换器(Exchange)负责按照一定的规则分发消息,Producer发送的消息实际上是到Exchange,由Exchange将消息路由到一个或多个Queue,如果找不到Queue,则根据客户端配置,要么返回给Producer,要么直接丢弃。

        路由键(RoutingKey)指定了路由规则,在Producer发送消息的时候,一般会指定RoutingKey,这样就知道消息需要路由到哪里。

        绑定(BingKey)将Exchange和Queue关联起来,这样,RabbitMQ就知道如何正确的将消息路由到队列了。

        关于路由键和绑定键,对于初学者可能有点混淆,这里分享下我的理解:路由键是在客户端发送消息的时候,告诉服务器,我发送的消息根据我指定的路由键去找队列;而绑定键,是在创建的时候使用的,告诉服务器,交换器是如何与队列关联的。具体可以对照下面的代码示例来理解可能容易点。

交换器类型

        RabbitMQ提供了多个交换器类型来满足不同的需求:fanout、direct、topic、headers。

fanout

fanout exchange会把消息发送到所有与该交换器绑定的队列中,会忽略Procuder发送消息时申明的RoutingKey。如下图,Producer发送message给fanout_exchange,并制定了routing key 为 info,最终queue1和queue2都收到了这条消息。

 

    public void testFanoutExchange() throws IOException, TimeoutException {// fanout类型的 exchange, 在server端忽略 routing key,只要发送到 exchange,任何和这个 exchange 绑定的 queue都会收到这条消息String exchangeName = "fanout_exchange";String queue1 = "queue1";String queue2 = "queue2";String warning = "warning";String info = "info";ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("root");factory.setPassword("root");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);channel.queueDeclare(queue1, true, false, false, null);channel.queueDeclare(queue2, true, false, false, null);channel.queueBind(queue1, exchangeName, warning);channel.queueBind(queue2, exchangeName, warning);channel.queueBind(queue2, exchangeName, info);String message = "info";channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());channel.close();connection.close();}

direct

direct exchange也是一种比较简单的exchange,在路由的时候,只有routing key 和binding key完全匹配的时候,才会路由到queue。如下图,Producer发送消息到direct_exchange,routing_key 是info,只有queue2才会收到消息。

    public void testDirectExchange() throws IOException, TimeoutException {String exchangeName = "direct_exchange";String queue1 = "direct_queue1";String queue2 = "direct_queue2";String warning = "warning";String info = "info";ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("root");factory.setPassword("root");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);channel.queueDeclare(queue1, true, false, false, null);channel.queueDeclare(queue2, true, false, false, null);channel.queueBind(queue1, exchangeName, warning);channel.queueBind(queue2, exchangeName, warning);channel.queueBind(queue2, exchangeName, info);String message = "direct exchange";channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());channel.close();connection.close();}

topic

topic exchange 是比较灵活,实际项目中用的比较多的一种,它也是将消息路由到 routing key 和 binding key 匹配的队列中,匹配规则如下:

  • routing key 和 binding key 以点(.)为分隔符,被点号分割的字符串为一个独立的匹配单元,如com.rabbitmq.client,com.java.util 等等。
  • ‘#’ 和 ‘*’ 用于做模糊匹配,‘#’ 匹配0个或多个词,‘*’ 匹配0个词。

下图中topic_exchange绑定了两个queue,*.rabbitmq.* 绑定颅queue1, *.*.client 、 com.# 都绑定了queue2,当binding key 为 com.rabbitmq.client,匹配queue1和queue2,因此都会收到消息;当binding key 为 org.rabbitmq.server 时,只有queue1匹配,当 binding key 为 com.hidden.demo 时,只有queue2匹配,当bingding key 为 aaa 时,queue1 和 queue2 都不匹配。

 

 

    public void testTopicExchange() throws IOException, TimeoutException {// * 匹配一个单词,# 匹配0个或多个单词String exchangeName = "topic_exchange";String queue1 = "topic_queue1";String queue2 = "topic_queue2";String routing1 = "*.rabbitmq.*";String routing2 = "*.*.client";String routing3 = "com.#";ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);channel.queueDeclare(queue1, true, false, false, null);channel.queueDeclare(queue2, true, false, false, null);channel.queueBind(queue1, exchangeName, routing1);channel.queueBind(queue2, exchangeName, routing2);channel.queueBind(queue2, exchangeName, routing3);String message = "topic exchange";// queue1, queue2channel.basicPublish(exchangeName, "com.rabbitmq.client", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// queue1channel.basicPublish(exchangeName, "org.rabbitmq.server", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// queue2channel.basicPublish(exchangeName, "com.hidden.demo", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// NONEchannel.basicPublish(exchangeName, "aaaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}

headers

headers exchange 不依赖路由键的匹配规则来路由消息,而是根据消息的headers属性值,RabbitMQ收到消息后,对比消息的headers中的属性值是否与queue、exchange绑定时指定的属性值一致,如果完全匹配,则路由消息到队列。由于headers exchange 性能很差,所以这里不做代码演示,感兴趣的小伙伴可以实验一下。

        以上就是对RabbitMQ中的一些概念做了一下介绍,小伙伴们可以多做实验加深理解,我也是在写了几个UnitTest之后,才理解这些概念,尤其是Exchange、RoutingKey 和 BindingKey。

        感谢各位小伙伴的阅览,也很高兴能和各位小伙伴一起钻研探讨技术问题。

 

相关文章:

RabbitMQ相关概念介绍

这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很…...

在jenkins容器内部使用docker

在jenkins容器内部使用docker 1.使用本地的docker 进入/var/run,找到docker.sock [rootnpy run]# ls auditd.pid containerd cryptsetup dmeventd-client docker.pid initramfs lvm netreport sepermit sudo tmpfiles.d user chro…...

分布式事务解决方案

数据不会无缘无故丢失,也不会莫名其妙增加 一、概述 1、曾几何时,知了在一家小公司做项目的时候,都是一个服务打天下,所以涉及到数据一致性的问题,都是直接用本地事务处理。 2、随着时间的推移,用户量增…...

2022黑马Redis跟学笔记.实战篇(三)

2022黑马Redis跟学笔记.实战篇 三4.2.商家查询的缓存功能4.3.1.认识缓存4.3.1.1.什么是缓存4.3.1.2.缓存的作用1.为什么要使用缓存2.如何使用缓存3. 添加商户缓存4. 缓存模型和思路4.3.1.3.缓存的成本4.3.2.添加redis缓存4.3.3.缓存更新策略4.3.3.1.三种策略(1).内存淘汰:Redis…...

hadoop环境新手安装教程

1、资源准备: (1)jdk安装包:我的是1.8.0_202 (2)hadoop安装包:我的是hadoop-3.3.1 注意这里不要下载成下面这个安装包了,我就一开始下载错了 错误示例: 2、主机网络相…...

数据结构与算法基础-学习-11-线性表之链栈的初始化、判断非空、压栈、获取栈长度、弹栈、获取栈顶元素

一、个人理解链栈相较于顺序栈不存在上溢(数据满)的情况,除非内存不足,但存储密度会低于顺序栈,因为会多存一个指针域,其他逻辑和顺序表一致。总结如下:头指针指向栈顶。链栈没有头节点直接就是…...

Hive内置函数

文章目录Hive内置函数字符串函数时间类型函数数学函数集合函数条件函数类型转换函数数据脱敏函数其他函数用户自定义函数Hive内置函数 查询内置函数用法: DESCRIBE FUNCTION EXTENDED 函数名;字符串函数 字符串连接函数:concat带分隔符字符串连接函数…...

Git如何快速入门

什么是Git?我们开发的项目,也需要一个合适的版本控制系统来协助我们更好地管理版本迭代,而Git正是因此而诞生的(有关Git的历史,这里就不多做阐述了,感兴趣的小伙伴可以自行了解,是一位顶级大佬在…...

netcore构建webservice以及调用的完整流程

目录构建前置准备编写服务挂载服务处理SoapHeader调用添加服务调用服务补充内容构建 前置准备 框架版本要求:netcore3.1以上 引入nuget包 SoapCore 编写服务 1.编写服务接口 示例 using System.ServiceModel;namespace Services;[ServiceContract(Namespace &…...

Mysql事务基础(解析)

并发事务带来的问题A和B是并发事务脏写(A被B覆盖)两个事务。B事务覆盖了A事务。解决:应该事务并行脏读(B读到了A的执行中间结果)A修改了东西。B看到了他的中间状态。解决:读写冲突。加锁,改完再…...

2023 年首轮土地销售活动来了 与 The Sandbox 一起体验「体素狂热」!

2 月 14 日晚上 11 点,开始你的体素冒险。 The Sandbox 很高兴推出 2023 年的第一次土地销售活动。欢迎来到「体素狂热 (Voxel Madness)」! 简要概括 土地销售抽奖活动将于北京时间 2 月 14 日星期二晚上 11 点开始 「体素狂热」 土地销售活动将于 2 月…...

vue AntD中栅格布局的四种大小xs,sm,md,lg

cssBootstrap栅格布局的四种大小xs,sm,md,lg前端为了页面在不同大小的设备上也能够正常显示,通常会使用栅格布局的方式来实现。使用bootStrap的网格系统时,常见到一下格式的类名col-*-*visible-*-*hidden_*_* 中间可为xs,xsm,md,lg等表示大小的单词的缩写…...

window.open()打开窗口全屏

window.open (page.html, page, height100, width400, top0, left0, toolbarno, menubarno, scrollbarsno, resizableno,locationn o, statusno, fullscreenyes); 参数解释: window.open() 弹出新窗口的命令; ‘page.html’ 弹出窗口的文件名&#xff…...

VFIO软件依赖——VFIO协议

文章目录背景PCI设备模拟PCI设备抽象VFIO协议实验Q&A背景 在虚拟化应用场景中,虚拟机想要在访问PCI设备时达到IO性能最优,最直接的方法就是将物理设备暴露给虚拟机,虚拟机对设备的访问不经过任何中间层的转换,没有虚拟化的损…...

C/C++【内存管理】

✨个人主页: Yohifo 🎉所属专栏: C修行之路 🎊每篇一句: 图片来源 Love is a choice. It is a conscious commitment. It is something you choose to make work every day with a person who has chosen the same thi…...

第8篇:Java编程语言的8大优势

目录 1、简单性 2、面向对象 3、编译解释性 4、稳健性 5、安全性 6、跨平台性...

STM32定时器实现红外接收与解码

1.NEC协议 红外遥控是一种比较常用的通讯方式,目前红外遥控的编码方式中,应用比较广泛的是NEC协议。NEC协议的特点如下: 载波频率为 38KHz8位地址和 8位指令长度地址和命令2次传输(确保可靠性)PWM 脉冲位置调制&#…...

18- Adaboost梯度提升树 (集成算法) (算法)

Adaboost 梯度提升树: from sklearn.ensemble import AdaBoostClassifier model AdaBoostClassifier(n_estimators500) model.fit(X_train,y_train) 1、Adaboost算法介绍 1.1、算法引出 AI 39年(公元1995年),扁鹊成立了一家专治某疑难杂症…...

zlink 介绍

zlink 是一个基于 flink 开发的分布式数据开发工具,提供简单的易用的操作界面,降低用户学习 flink 的成本,缩短任务配置时间,避免配置过程中出现错误。用户可以通过拖拉拽的方式实现数据的实时同步,支持多数据源之间的…...

C++之std::string的resize与reverse

std::string的resize与reverse前言1.resize2.reserve前言 在C中我们经常用std::string 来保存字符串,其中有两个比较常用但是却平时容易被搞混的两个函数,分别是resize和reserve,模糊意识里,这两个方法都是对std::string的容量或元…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...