当前位置: 首页 > news >正文

RabbitMQ 开发指南

连接RabbitMQ

连接方式一:
在这里插入图片描述
也可以选择使用URI的方式来实现

连接方式二:
在这里插入图片描述
Connection接口被用来创建一个Channel,在创建之后,Channel可以用来发送或者接收消息。

Channel channel = conn.createChannel();

使用交换器和队列

声明一个交换器和队列

channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由RabbitMQ自动生成),这里的交换器和队列都没有设置特殊的参数。

上面声明的队列具备如下特性:只对当前应用中同一个Connection层面可用,同一个Connection的不同Channel可共用,并且也会在应用连接断开时自动删除。

如果要在应用中共享一个队列,可做如下声明:

channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);

这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配给另一个确定的已知名称。

exchangeDeclare方法详解

exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

Exchange.DeclareOK exchangeDeclare(String exchange,String type,
boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments)throws Exception;
  • exchange: 交换器名称
  • type: 交换器类型,常见的有fanout、direct、topic
  • durable: 是否持久化,durable设置true表示持久化,持久化后可以将交换器存盘。
  • autoDelete:是否自动删除,表示一旦这个交换器上没有任何绑定(即没有任何队列与其相连),RabbitMQ会自动删除这个交换机。这对于临时的、生命周期与某些特定队列或应用程序密切相关的交换机非常有用。
  • internal :如果为true,表明是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如alternate-exchange。

此外,其他声明交换机的方法

  • exchangeDeclareNoWait: 表示在声明exchange时候,不需要服务器返回。这可能会导致一种情况,在声明完一个交换器之后(实际服务器还并未完成交换器的创建),客户端紧接着使用这个交换器,必然会发生异常。
  • exchangeDeclarePassive:可以用来检测交换机是否存在,如果存在则正常返回,如果不存在则抛出异常。

queueDeclare方法详解

queueDeclare方法,只有如下两个重载方法

1. Queue.DeclareOK queueDeclare() throws IOException;
2. Queue.DeclareOK queueDeclare(String queue,boolean durable,
boolean exclusive,boolean autoDelete,Map<String,Object> arguments)
throws IOException;

不带任何参数的queueDeclare方法默认创建一个由RabbitMQ 自动生成名称 的、排他的自动删除非持久化 的队列。

方法参数详解

  • queue:队列名称
  • durable:设置是否持久化,持久化的队列会存盘,在服务器重启的时候保证不丢失消息。
  • exclusive:排他队列
  • autoDelete:自动删除,当队列中没有任何活跃的消费者,RabbitMQ会在一段时间后自动删除该队列。当一个队列删除时,其上持久化的消息也会随之被删除。
  • arguments : 设置队列的其他参数

排他队列的特点和行为

  • 独占性:一旦某个连接声明了一个排他队列,任何其他连接都无法访问或者声明同名的队列。
  • 自动删除:当声明排他队列的连接关闭时,RabbitMQ会自动删除这个队列,即使队列被声明为持久化的。
    同一连接通道共享:虽然排他队列对其他连接不可见,但同一连接内的不同通道可以共享访问这个排他队列。
  • 排他队列常用于那些希望队列仅被当前线程或应用实例使用的场景。

注意:生产者和消费者都能够使用queueDeclare声明一个队列,如果消费者在同一个信道上订阅了另一个队列,就无法在声明队列了一个消费者只能订阅一个队列)。必须先取消订阅,然后将channel设置为“传输模式”,之后才能声明队列。

交换机持久化和队列持久化的区别

  • 交换机持久化主要关注的是交换机的配置和元数据的长期存储,确保重启后配置不变。
  • 队列持久化关注队列自身及其消息的长期存储,需要结合消息的持久化设置来防止消息丢失。

queueBind方法详解

queueBind方法如下

1. Queue.BindOK queueBind(String queue,String exchange,String routingKey) 
throws IOException;2. Queue.BindOK queueBind(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;3. void queueBindNoWait(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;

参数详解

  • queue:队列名称
  • exchange:交换机名称
  • routingKey:用来绑定队列和交换机的路由键
  • argument:定义绑定的一些参数

exchangeBind方法详解

exchangeBind方法如下

 1. Exchange.BindOK exchangeBind(String destination,String source,String routingKey)throws IOException;2. Exchange.BindOK exchangeBind(String destination,String source,String routingKey,
Map<String,Object> arguments) throws IOException;3. Exchange.BindOK exchangeBindNoWait(String destination,String source,
String routingKey,Map<String,Object> arguments) throws IOException;

绑定以后,消息从source交换机转发到destination交换机。某种程度上可以将destination交换机看作一个队列。
在这里插入图片描述

发送消息

如果要发送一个消息,可以使用Channel类的basicPublish方法。

示例:

byte[] messageBodyBytes = "Hello,World!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

对于basicPublish而言,有几个重载方法

1. void basicPublish(String exchange,String routingKey,
BasicProperties props,byte[] body) throws IOException;2.void basicPublish(String exchange,String routingKey,boolean mandatory,
BasicProperties props,byte[] body) throws IOException;3. void basicPublish(String exchange,String routingKey,boolean mandatory,
boolean immediate,BasicProperties props,byte[] body) throws IOException;
  • exchange :交换机的名称,指明消息需要发送到哪个交换机中,如果设置为空,则消息会被发送到RabbitMQ默认的交换机中。
  • routingKey:路由键,交换机根据路由键将消息存储到相应的队列中。
  • props:消息的基本属性集,包括contentType、deliveryMode等
  • byte[] body: 真正要发送的消息内容

消费消息

消费模式分为两种:Push模式和Pull模式。推模式采用Basic.Consume进行消费,拉模式采用Basic.Get进行消费。

推模式

当调用Consumer相关API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码如下所示

在这里插入图片描述
对于消费者来说,显示的设置autoAck为false,接收消息之后进行显示ack操作,这个设置是非常必要的。可以防止消息不必要的丢失。

核心方法

String basicConsume(String queue,boolean autoack,String consumerTag,
boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) 
throws IOException;
  • queue: 队列的名称
  • autoAck: 设置是否自动确认
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置true表示不能将一个Connection中生产者发送的消息发送给这个Connection中的消费者。
  • exclusive:设置是否排他,确保该队列仅对创建他的消费者可见。
  • arguments:设置消费者的其他参数
  • callback:设置消费者的回调函数。

每个Channel都拥有自己独立的线程,最常用的做法是一个Channel对应一个消费者,也就意味着消费者彼此之间没有关联,也可以在Channel中维持多个消费者,但是,如果Channel中一个消费者一直在运行,那其他消费者的callback会被耽搁。
在这里插入图片描述

拉模式

通过channel.basicGet方法可以单条获取消息,其返回值是GetResponse。核心方法如下:

GetResponse basicGet(String queue,boolean autoAck) throws IOException;

如果autoAck为false,那么同样需要调用channel.basicAck来确认消息已经被成功接受。

GetResponse response = channel.basicGet(QueueName,false);System.out.println(new String(response.getBody()));channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

注意:Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止,接收模式期间,RabbitMQ会不断将消息推送给消费者,推送消息的个数受到Basic.QoS的限制。如果只想从队列里获取单条消息而不是持续订阅,建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里代替Basic.Consume,这样做会严重影响MQ性能。
在这里插入图片描述

消费端的确认和拒绝

消息确认

RabbitMQ为了保证消息从队列可靠的到达消费者,提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示的回复确认信号后才从内存(或者磁盘)中移除消息(即使消息配置了持久化,在被ack以后仍然要被删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从内存中删除,而不管消费者是否真正消费这些消息。

把消息确认设置为false,消费者就有足够的时间处理消息,不用担心处理过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显示调用Basic.Ack命令为止。

当autoAck参数置为false,对于MQ服务端而言,队列中的消息分成了两个部分:

  1. 等待投递给消费者的消息
  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息:如果一直没有收到确认信号,消费此消息的消费者已经断开连接,则MQ会重新安排该消息进入队列,等待投递给下一个消费者。

MQ不会为未确认的消息设置过期时间,他判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。如此设计,是因为RabbitMQ允许消费者消费一条消息的时间可以很久很久。

消息拒绝

Channel类中的basicReject方法定义如下

void basicReject(long deliveryTag,boolean requeue) throws IOException;

其中,deliveryTag可以看作消息的编号,如果requeue为true,则RabbitMQ会重新将这条消息存入队列,以便发送给下一个订阅的消费者。

如果想要批量拒绝消息,可以使用Basic.Nack这个命令。

void basicNack(long deliveryTag,boolean multiple,boolean requeue) throws IOException;

multiple参数为false表示只拒绝编号为deliveryTag的这条消息,如果设置为true,表示拒绝编号deliveryTag之前所有未被消费者确认的消息。

关闭连接

在应用程序使用完毕之后,需要关闭连接,释放资源

channel.close();
connection.close();

AMQP协议中connection和channel采用同样的方式来管理网络失败、内部错误和显示关闭连接。connection和channel所具备的生命周期如下:

  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态,当前对象被显示通知调用关闭方法,这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
  • Closed:已经关闭,当前对象已经接收到所有内部对象以完成关闭动作的通知,并且其也关闭了自身。

与关闭操作相关的方法

  • addShutdownListener(ShutdownListener listener): 当connection或者channel状态转变为closed的时候调用ShutdownListener,而如果将一个ShutdownListener注册到一个已经处于Closed状态的对象(特指Connection或者Channel对象),会立刻调用ShutdownListener。
  • removeShutdownListener(ShutdownListener listener)
  • getCloseReason: 获取connection或者channel关闭原因
  • isOpen:检测当前对象是否开启
  • close(int closeCode,String closeMessage):显示通知连接执行关闭操作。

代码清单
在这里插入图片描述
当触发ShutdownListener时候,可以获取到ShutdownSingalException,这个信号包含了关闭的原因。

ShutdownSingalException 提供多个方法来分析关闭原因,isHardError方法可以知道是Connection错误还是Channel错误;getReason可以获取Cause相关的信息。

在这里插入图片描述

相关文章:

RabbitMQ 开发指南

连接RabbitMQ 连接方式一&#xff1a; 也可以选择使用URI的方式来实现 连接方式二&#xff1a; Connection接口被用来创建一个Channel&#xff0c;在创建之后&#xff0c;Channel可以用来发送或者接收消息。 Channel channel conn.createChannel();使用交换器和队列 声明…...

ElasticSearch学习笔记(二)文档操作、RestHighLevelClient的使用

文章目录 前言3 文档操作3.1 新增文档3.2 查询文档3.3 修改文档3.3.1 全量修改3.3.2 增量修改 3.4 删除文档 4 RestAPI4.1 创建数据库和表4.2 创建项目4.3 mapping映射分析4.4 初始化客户端4.5 创建索引库4.6 判断索引库是否存在4.7 删除索引库 5 RestClient操作文档5.1 准备工…...

python离线安装第三方库、及其依赖库(单个安装,非批量移植)

文章目录 1.外网下载第三方库、依赖库2.内网安装第三方库3.补充附录内网中离线安装python第三方库,这时候只能去外网手动下载第三方库,再传回内网进行安装。 问题是python第三方库往往有其前置依赖包,你很难清楚某个第三方库依赖的是哪些依赖包,更难受的是依赖包可能还有其…...

昨天发的 npm 包,却因为 registry 同步问题无法安装使用

用过 HBuilderX 云打包的都知道&#xff0c;云上面的 Android 环境很有限&#xff0c;其实并不能覆盖 uniapp 生态所有的版本&#xff0c;甚至说只能覆盖最新的一两个版本。 如果你需要用到 HBuilderX 安卓云打包&#xff0c;就必须及时跟进 HBuilderX 的版本更新&#xff0c;…...

Redis 数据恢复及持久化策略分析

在分布式系统中&#xff0c;Redis作为高性能的键值存储数据库&#xff0c;广泛应用于缓存、会话管理、消息队列等场景。对于Redis数据的可靠性&#xff0c;持久化是至关重要的一环。当Redis宕机时&#xff0c;如何恢复数据成为一个关键问题。这篇文章将详细分析Redis的数据恢复…...

vscode 快捷键侧边栏

_____ 配置 vscode 快捷键 visual studio code - open explorer and close sidebar with the same key - Stack Overflow { "key": "ctrlshifte", // when Explorer not open // "command": "workbench.view.explorer", // either…...

FreeRTOS:1、任务通知vTaskNotifyGiveFromISR保证实时性

文章目录 背景解释意义 背景 首先&#xff0c;我们看以下代码&#xff1a; #include "FreeRTOS.h" #include "task.h"TaskHandle_t s_task_handle NULL;void vTaskFunction(void *pvParameters) {for (;;) {// 等待通知ulTaskNotifyTake(pdTRUE, portMA…...

监督学习:从数据中学习预测模型的艺术与科学

目录 引言 一、监督学习的基本概念 1、数据集 2、特征 3、标签 4、模型 二、监督学习的原理和方法 1、基本原理 2、常用方法 三、监督学习的定义与分类 1、 定义 2.、分类 四、为什么是监督学习&#xff1f; 1、 明确的学习目标 2、高准确率 3、易于评估 4、 …...

深入理解Java虚拟机(JVM)中的垃圾回收器

垃圾回收&#xff08;Garbage Collection, GC&#xff09;是现代编程语言中用于管理内存的重要机制&#xff0c;特别是在Java虚拟机&#xff08;JVM&#xff09;中。 它的基本原理是自动检测和释放不再被程序使用的内存&#xff0c;以避免内存泄漏和提高程序执行效率。 1.GC的基…...

视频集市新增支持多格式流媒体拉流预览

流媒体除了常用实时流外还有大部分是以文件的形式存在&#xff0c;做融合预览必须要考虑多种兼容性能力&#xff0c;借用现有的ffmpeg生态可以迅速实现多种格式的支持&#xff0c;现在我们将按需拉流预览功能进行了拓展&#xff0c;正式支持了ffmpeg的功能&#xff0c;可快捷方…...

定时器-前端使用定时器3s轮询状态接口,2min为接口超时

背景 众所周知&#xff0c;后端是处理不了复杂的任务的&#xff0c;所以经过人家的技术讨论之后&#xff0c;把业务放在前端来实现。记录一下这次的离大谱需求吧。 如图所示&#xff0c;这个页面有5个列表&#xff0c;默认加载计划列表。但是由于后端的种种原因&#xff0c;这…...

python实践笔记(二): 类和对象

1. 写在前面 最近在重构之前的后端代码&#xff0c;借着这个机会又重新补充了关于python的一些知识&#xff0c; 学习到了一些高效编写代码的方法和心得&#xff0c;比如构建大项目来讲&#xff0c;要明确捕捉异常机制的重要性&#xff0c; 学会使用try...except..finally&…...

指定GPU跑模型

加上一个CUDA_VISIBLE_DEVICES0,2就行了&#xff0c;使用0卡和2卡跑模型&#xff0c;注意多卡有时候比单卡慢&#xff0c;4090无NVlink&#xff0c;数据似乎是通过串行的方式传输到多个gpu的&#xff0c;只不过单个gpu是并行计算&#xff0c;数据在gpu与gpu之间似乎是串行传输的…...

Windows桌面运维----第五天

1、华为路由怎们配置IP、划分vlan、互通&#xff1a; 1、用户模式→系统模式&#xff1b; 2、进入相关端口&#xff0c;配置IP地址&#xff1b; 3、开通相应vlan,设置vlanX、IP地址&#xff1b; 4、绑定相关端口&#xff0c;设置端口类型&#xff1b; 5、电脑设置IP&#…...

bash和dash的区别(及示例)

什么是bash、dash Bash(GNU Bourne-Again Shell)是许多Linux平台的内定Shell&#xff0c;事实上&#xff0c;还有许多传统UNIX上用的Shell&#xff0c;像tcsh、csh、ash、bsh、ksh等等。 GNU/Linux 操作系统中的 /bin/sh 本是 bash (Bourne-Again Shell) 的符号链接&#xff0…...

Java基础入门day65

day65 web项目 页面设计 仿照小米官网&#xff0c;将首页保存到本地为一个html页面&#xff0c;再将html页面保存为jsp页面&#xff0c;在项目中的web.xml文件中配置了欢迎页 <welcome-file-list><welcome-file>TypesServlet</welcome-file> </welcome-…...

解密制度的规定和解密工作的具体流程

解密制度是指对于某些敏感的文件或资料,经过一定的时间后,根据相关规定和程序,可以进行解密,解除文件的保密状态,使其可以被公众查阅或利用。解密制度的目的在于确保涉密信息的保密等级与其重要程度相适应,防止涉密信息的泄露和使用不当,同时促进信息公开、传播历史知识…...

实际中常用的网络相关命令

一、ping命令 ping是个使用频率极高的实用程序&#xff0c;主要用于确定网络的连通性。这对确定网络是否正确连接&#xff0c;以及网络连接的状况十分有用。 简单的说&#xff0c;ping就是一个测试程序&#xff0c;如果ping运行正确&#xff0c;大体上就可以排除网络访问层、网…...

机器学习补充

一、数据抽样 数据预处理阶段&#xff1a;对数据集进行抽样可以帮助减少数据量&#xff0c;加快模型训练的速度/减少计算资源的消耗&#xff0c;特别是当数据集非常庞大时&#xff0c;比如设置sample_rate0.8.平衡数据集&#xff1a;通过抽样平衡正负样本&#xff0c;提升模型…...

机器学习——RNN、LSTM

RNN 特点&#xff1a;输入层是层层相关联的&#xff0c;输入包括上一个隐藏层的输出h1和外界输入x2&#xff0c;然后融合一个张量&#xff0c;通过全连接得到h2&#xff0c;重复 优点&#xff1a;结构简单&#xff0c;参数总量少&#xff0c;在短序列任务上性能好 缺点&#x…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...