当前位置: 首页 > news >正文

RabbitMQ日常使用小结

一、使用场景

削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。

RabbitMQ的基本概念

二、组成及工作流程

1.主要组成

Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。
VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue
Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。
Queue:消息队列。

2.工作流程

RabbitMQ的工作流程

生产者发送消息流程:
1、和Broker建立TCP连接。
2、和Broker建立通道。
3、通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)。消费者接收消息流程:
1、和Broker建立TCP连接
2、和Broker建立通道
3、监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、接收到消息。
6、ack回复。

三、交换机Exchange(默认direct)

交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。

1.交换机种类

Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。
直连交换机
fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。
广播/扇形交换机
topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。
主题交换机
headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。

2.交换机属性

Name:交换机名称
Durability:持久化标志,表明此交换机是否是持久化的
Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除
Arguments:依赖代理本身。

3.交换机状态

持久(durable)
暂存(transient)

4.消息确认机制(ACK)

自动ACK:消息一旦被接收,消费者自动发送ACK。
手动ACK:消息接收后,不会发送ACK,需要手动调用。

四、rabbitmq 客户端的使用

1.引入依赖

       <!-- rabbitmq 客户端依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

2.创建连接工具

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyRabbitMQUtils {public static Connection getConnel() throws Exception{//1 创建 ConnectionFactoryConnectionFactory factory = new ConnectionFactory() ;factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqfactory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection = factory.newConnection();//   Channel channel = connection.createChannel();return connection;}
}

3.生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 交换机名称private final static String EXCHANGE_NAME = "simple_exchange";// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[]args) throws Exception{Connection produceConnection = MyRabbitMQUtils.getConnel();Channel produceChannel = produceConnection.createChannel();// 建立交换机(广播)produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);/** 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<10;i++){String message = "生产者发布的消息---!";message = message+i;produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println(" Producer 发布'" + message + "'");}//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)produceChannel.close();produceConnection.close();}
}

4.消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Comsumer {// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {Connection comsumerConnection = MyRabbitMQUtils.getConnel();Channel comsumerChannel = comsumerConnection.createChannel();/** 参数明细* 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){/** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("Comsumer 获得: " + msg + "!");// 手动 ACKcomsumerChannel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列,第二个参数:是否自动进行消息确认。//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。* 3、callback,消费方法,当消费者接收到消息要执行的方法。*/comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);}
}

五、Spring中使用RabbitMQ

1.引入依赖

        <!-- AMQP 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.7.RELEASE</version></dependency><!--springboot测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.5.6</version></dependency>

2.更改配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

3.把交换机、和队列加入IOC容器中

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// email队列public static final String QUEUE_EMAIL = "queue_email";// sms队列public static final String QUEUE_SMS = "queue_sms";// topics类型交换机public static final String EXCHANGE_NAME="topic.exchange";public static final String ROUTINGKEY_EMAIL="topic.#.email.#";public static final String ROUTINGKEY_SMS="topic.#.sms.#";//声明交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/***   new Queue(QUEUE_EMAIL,true,false,false)*   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列*   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false*   exclusive  表示该消息队列是否只在当前connection生效,默认是false*/@Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列@Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}

4.模拟业务发送消息

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class Send {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMsgByTopics(){/*** 参数:* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i=0;i<5;i++){String message = "恭喜您,注册成功!userid="+i;rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);System.out.println(" [x] Sent '" + message + "'");}}
}

5.消息的监听及处理

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive {//监听邮件队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.email.#","email.*"}))public void rece_email(String msg){System.out.println(" [邮件服务] received : " + msg + "!");}//监听短信队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.sms.#"}))public void rece_sms(String msg){System.out.println(" [短信服务] received : " + msg + "!");}}

相关文章:

RabbitMQ日常使用小结

一、使用场景 削峰、解耦、异步。 基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发&#xff0c;并发量12000&#xff0c;支持持久化&#xff0c;稳定性好&#xff0c;集群不支持动态扩展。 RabbitMQ的基本概念 二、组成及工作流…...

​​​​​​​博物馆文物馆藏环境空气质量无线监控系统方案

博物馆文物馆藏环境空气质量无线监控系统方案 博物馆无线环境测控系统 博物馆恒温恒湿消毒净化系统 现代化博物馆空气质量一体化3D可视化管控平台 博物馆温湿度在线监控系统 博物馆光照在线监控系统 博物馆二氧化碳在线监控系统 博物馆在线监控系统 博物馆紫外线在线监控…...

测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类

【原文链接】测试理论----Bug的严重程度&#xff08;Severity&#xff09;和优先级&#xff08;Priority&#xff09;的分类 一、Bug的严重程度&#xff08;Severity&#xff09; Bug的Severity&#xff08;严重程度&#xff09;指的是一个Bug对软件系统功能影响的程度&#…...

斯坦福、Nautilus Chain等联合主办的 Hackathon 活动,现已接受报名

由 Stanford Blockchain Accelerator、Zebec Protocol、 Nautilus Chain、Rootz Lab 共同主办的黑客松活动&#xff0c;现已接受优秀项目提交参赛申请。 在加密行业发展早期&#xff0c;密码极客们就始终在对区块链世界基础设施&#xff0c;在发展方向的无限可能性进行探索。而…...

00后卷王,把我们这些老油条卷的辞职信都写好了........

现在的小年轻真的卷得过分了。 前段时间我们公司来了个00年的&#xff0c;工作没两年&#xff0c;跳槽到我们公司起薪18K&#xff0c;都快接近我了。 后来才知道人家是个卷王&#xff0c;从早干到晚就差搬张床到工位睡觉了。 最近和他聊了一次天&#xff0c;原来这位小老弟家…...

JavaEE(系列12) -- 常见锁策略

目录 1. 乐观锁和悲观锁 2. 轻量级锁与重量级锁 3. 自旋锁和挂起等待锁 4. 互斥锁和读写锁 5. 可重入锁与不可重入锁 6. 死锁 6.1 死锁的必要条件 6.2 如何避免死锁 7. 公平锁和非公平锁 8. Synchronized原理及加锁过程 8.1 Synchronized 小结 8.2 加锁工作过程 8.2.1 偏向锁…...

前端nginx接口跨域

前提&#xff1a;vue项目本地接口通过proxy都可使用&#xff0c;但是项目部署在服务器上后发现所有接口出现503如下状况 简而言之&#xff1a;页面部署在域名为https://aa.bb.cc.com/vehicle/#/下&#xff0c;但是我接口需访问的是https:// azz.qqv.com/permission/company/gro…...

【国产虚拟仪器】基于 ZYNQ 的电能质量系统高速数据采集系统设计

随着电网中非线性负荷用户的不断增加 &#xff0c; 电能质量问题日益严重 。 高精度数据采集系统能够为电能质 量分析提供准确的数据支持 &#xff0c; 是解决电能质量问题的关键依据 。 通过对比现有高速采集系统的设计方案 &#xff0c; 主 控电路多以 ARM 微控制器搭配…...

Java前缀和算法

一.什么是前缀和算法 通俗来讲&#xff0c;前缀和算法就是使用一个新数组来储存原数组中前n-1个元素的和&#xff08;如果新数组的当前元素的下标为n&#xff0c;计算当前元素的值为原数组中从0到n-1下标数组元素的和&#xff09;&#xff0c;可能这样讲起来有点抽象&#xff0…...

pico 的两个双核相关函数的延时问题

pico高级API函数中&#xff0c; multicore_fifo_pop_timeout_us 和 multicore_fifo_push_timeout_us 的延时参数&#xff0c; 如修改为500微秒以上时&#xff0c;其延时似乎远远超过设定值&#xff0c;其反馈速度似乎被主核的交互所左右 &#xff0c;而修改为200以下时&#x…...

Doxygen源码分析: QCString类依赖的qstr系列C函数浅析

2023-05-20 17:02:21 ChrisZZ imzhuofoxmailcom Hompage https://github.com/zchrissirhcz 文章目录 1. doxygen 版本2. QCString 类简介3. qstr 系列函数浅析qmemmove()qsnprintfqstrdup()qstrfree()qstrlen()qstrcpy()qstrncpy()qisempty()qstrcmp()qstrncmp()qisspace()qstr…...

华为OD机试之一种字符串压缩表示的解压(Java源码)

一种字符串压缩表示的解压 题目描述 有一种简易压缩算法&#xff1a;针对全部由小写英文字母组成的字符串&#xff0c;将其中连续超过两个相同字母的部分压缩为连续个数加该字母&#xff0c;其他部分保持原样不变。 例如&#xff1a;字符串“aaabbccccd”经过压缩成为字符串“…...

Microsoft Project Online部署方案

目录 一、前言 二、Microsoft Project Online简介 三、Microsoft Project Online的优势 1、云端部署 2、多设备支持...

飞浆AI studio人工智能课程学习(3)-在具体场景下优化Prompt

文章目录 在具体场景下优化Prompt营销场景办公效率场景日常生活场景海报背景图生成办公效率场景预设Prompt 生活场景中日常学习Prompt: 给写完的代码做文档 将优质Prompt模板化Prompt 1:Prompt 1:Prompt 2步骤文本过长而导致遗失信息的示例修改后 特殊示例 如何提升安全性主要目…...

企业工程行业管理系统源码-专业的工程管理软件-提供一站式服务

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示1…...

Ehcache 整合Spring 使用页面、对象缓存

Ehcache在很多项目中都出现过&#xff0c;用法也比较简单。一般的加些配置就可以了&#xff0c;而且Ehcache可以对页面、对象、数据进行缓存&#xff0c;同时支持集群/分布式缓存。如果整合Spring、Hibernate也非常的简单&#xff0c;Spring对Ehcache的支持也非常好。EHCache支…...

Spring Cloud中的服务路由与负载均衡

Spring Cloud中的服务路由与负载均衡 一、服务路由1. 服务发现2. 服务注册3. 服务消费4. 服务提供5. 服务路由实现 二、负载均衡1. 负载均衡的概念2. 负载均衡算法3. 负载均衡实现4. 负载均衡策略5. 使用Spring Cloud实现负载均衡 三、服务路由与负载均衡的集成1. 集成背景2. 集…...

rails routes的使用

Rails routes 是用于确定应该将请求发送到哪个控制器和操作的一种机制。在 Rails 应用程序中&#xff0c;可以通过定义路由来映射 URL 到控制器操作。可以使用 rails routes 命令查看当前应用程序中定义的所有路由。 以下是一些常见的用法&#xff1a; 查看所有路由&#xff…...

Linux基础内容(21)—— 进程消息队列和信号量

Linux基础内容&#xff08;20&#xff09;—— 共享内存_哈里沃克的博客-CSDN博客 目录 1.消息队列 1.定义 2.操作 2.信号量 1.定义 2.细节 3.延申 4.操作 3.IPC的特点共性 1.消息队列 1.定义 定义&#xff1a;是操作系统提供的内核级队列 2.操作 msgget&#xff1a;…...

STM32实现基于RS485的简单的Modbus协议

背景 我这里用STM32实现&#xff0c;其实可以搬移到其他MCU&#xff0c;之前有项目使用STM32实现Modbus协议 这个场景比较正常&#xff0c;很多时候都能碰到 这里主要是Modbus和变频器通信 最常见的是使用Modbus实现传感器数据的采集&#xff0c;我记得之前用过一些传感器都…...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架&#xff0c;它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用&#xff0c;和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...