RabbitMQ日常使用小结
一、使用场景
削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。

二、组成及工作流程
1.主要组成
Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。 VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。 Queue:消息队列。
2.工作流程

生产者发送消息流程: 1、和Broker建立TCP连接。 2、和Broker建立通道。 3、通过通道消息发送给Broker,由Exchange将消息进行转发。 4、Exchange将消息转发到指定的Queue(队列)。消费者接收消息流程: 1、和Broker建立TCP连接 2、和Broker建立通道 3、监听指定的Queue(队列) 4、当有消息到达Queue时Broker默认将消息推送给消费者。 5、接收到消息。 6、ack回复。
三、交换机Exchange(默认direct)
交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
1.交换机种类
Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。

fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。

topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。

headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。
2.交换机属性
Name:交换机名称 Durability:持久化标志,表明此交换机是否是持久化的 Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除 Arguments:依赖代理本身。
3.交换机状态
持久(durable)
暂存(transient)
4.消息确认机制(ACK)
自动ACK:消息一旦被接收,消费者自动发送ACK。 手动ACK:消息接收后,不会发送ACK,需要手动调用。
四、rabbitmq 客户端的使用
1.引入依赖
<!-- rabbitmq 客户端依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>
2.创建连接工具
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyRabbitMQUtils {public static Connection getConnel() throws Exception{//1 创建 ConnectionFactoryConnectionFactory factory = new ConnectionFactory() ;factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqfactory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection = factory.newConnection();// Channel channel = connection.createChannel();return connection;}
}
3.生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 交换机名称private final static String EXCHANGE_NAME = "simple_exchange";// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[]args) throws Exception{Connection produceConnection = MyRabbitMQUtils.getConnel();Channel produceChannel = produceConnection.createChannel();// 建立交换机(广播)produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);/** 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<10;i++){String message = "生产者发布的消息---!";message = message+i;produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println(" Producer 发布'" + message + "'");}//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)produceChannel.close();produceConnection.close();}
}
4.消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Comsumer {// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {Connection comsumerConnection = MyRabbitMQUtils.getConnel();Channel comsumerChannel = comsumerConnection.createChannel();/** 参数明细* 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){/** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("Comsumer 获得: " + msg + "!");// 手动 ACKcomsumerChannel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列,第二个参数:是否自动进行消息确认。//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。* 3、callback,消费方法,当消费者接收到消息要执行的方法。*/comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);}
}
五、Spring中使用RabbitMQ
1.引入依赖
<!-- AMQP 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.7.RELEASE</version></dependency><!--springboot测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.5.6</version></dependency>
2.更改配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
3.把交换机、和队列加入IOC容器中
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// email队列public static final String QUEUE_EMAIL = "queue_email";// sms队列public static final String QUEUE_SMS = "queue_sms";// topics类型交换机public static final String EXCHANGE_NAME="topic.exchange";public static final String ROUTINGKEY_EMAIL="topic.#.email.#";public static final String ROUTINGKEY_SMS="topic.#.sms.#";//声明交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/*** new Queue(QUEUE_EMAIL,true,false,false)* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false* exclusive 表示该消息队列是否只在当前connection生效,默认是false*/@Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列@Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}
4.模拟业务发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class Send {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMsgByTopics(){/*** 参数:* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i=0;i<5;i++){String message = "恭喜您,注册成功!userid="+i;rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);System.out.println(" [x] Sent '" + message + "'");}}
}
5.消息的监听及处理
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive {//监听邮件队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.email.#","email.*"}))public void rece_email(String msg){System.out.println(" [邮件服务] received : " + msg + "!");}//监听短信队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.sms.#"}))public void rece_sms(String msg){System.out.println(" [短信服务] received : " + msg + "!");}}
相关文章:

RabbitMQ日常使用小结
一、使用场景 削峰、解耦、异步。 基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。 RabbitMQ的基本概念 二、组成及工作流…...

博物馆文物馆藏环境空气质量无线监控系统方案
博物馆文物馆藏环境空气质量无线监控系统方案 博物馆无线环境测控系统 博物馆恒温恒湿消毒净化系统 现代化博物馆空气质量一体化3D可视化管控平台 博物馆温湿度在线监控系统 博物馆光照在线监控系统 博物馆二氧化碳在线监控系统 博物馆在线监控系统 博物馆紫外线在线监控…...

测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类
【原文链接】测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类 一、Bug的严重程度(Severity) Bug的Severity(严重程度)指的是一个Bug对软件系统功能影响的程度&#…...

斯坦福、Nautilus Chain等联合主办的 Hackathon 活动,现已接受报名
由 Stanford Blockchain Accelerator、Zebec Protocol、 Nautilus Chain、Rootz Lab 共同主办的黑客松活动,现已接受优秀项目提交参赛申请。 在加密行业发展早期,密码极客们就始终在对区块链世界基础设施,在发展方向的无限可能性进行探索。而…...

00后卷王,把我们这些老油条卷的辞职信都写好了........
现在的小年轻真的卷得过分了。 前段时间我们公司来了个00年的,工作没两年,跳槽到我们公司起薪18K,都快接近我了。 后来才知道人家是个卷王,从早干到晚就差搬张床到工位睡觉了。 最近和他聊了一次天,原来这位小老弟家…...

JavaEE(系列12) -- 常见锁策略
目录 1. 乐观锁和悲观锁 2. 轻量级锁与重量级锁 3. 自旋锁和挂起等待锁 4. 互斥锁和读写锁 5. 可重入锁与不可重入锁 6. 死锁 6.1 死锁的必要条件 6.2 如何避免死锁 7. 公平锁和非公平锁 8. Synchronized原理及加锁过程 8.1 Synchronized 小结 8.2 加锁工作过程 8.2.1 偏向锁…...

前端nginx接口跨域
前提:vue项目本地接口通过proxy都可使用,但是项目部署在服务器上后发现所有接口出现503如下状况 简而言之:页面部署在域名为https://aa.bb.cc.com/vehicle/#/下,但是我接口需访问的是https:// azz.qqv.com/permission/company/gro…...

【国产虚拟仪器】基于 ZYNQ 的电能质量系统高速数据采集系统设计
随着电网中非线性负荷用户的不断增加 , 电能质量问题日益严重 。 高精度数据采集系统能够为电能质 量分析提供准确的数据支持 , 是解决电能质量问题的关键依据 。 通过对比现有高速采集系统的设计方案 , 主 控电路多以 ARM 微控制器搭配…...

Java前缀和算法
一.什么是前缀和算法 通俗来讲,前缀和算法就是使用一个新数组来储存原数组中前n-1个元素的和(如果新数组的当前元素的下标为n,计算当前元素的值为原数组中从0到n-1下标数组元素的和),可能这样讲起来有点抽象࿰…...
pico 的两个双核相关函数的延时问题
pico高级API函数中, multicore_fifo_pop_timeout_us 和 multicore_fifo_push_timeout_us 的延时参数, 如修改为500微秒以上时,其延时似乎远远超过设定值,其反馈速度似乎被主核的交互所左右 ,而修改为200以下时&#x…...

Doxygen源码分析: QCString类依赖的qstr系列C函数浅析
2023-05-20 17:02:21 ChrisZZ imzhuofoxmailcom Hompage https://github.com/zchrissirhcz 文章目录 1. doxygen 版本2. QCString 类简介3. qstr 系列函数浅析qmemmove()qsnprintfqstrdup()qstrfree()qstrlen()qstrcpy()qstrncpy()qisempty()qstrcmp()qstrncmp()qisspace()qstr…...
华为OD机试之一种字符串压缩表示的解压(Java源码)
一种字符串压缩表示的解压 题目描述 有一种简易压缩算法:针对全部由小写英文字母组成的字符串,将其中连续超过两个相同字母的部分压缩为连续个数加该字母,其他部分保持原样不变。 例如:字符串“aaabbccccd”经过压缩成为字符串“…...
Microsoft Project Online部署方案
目录 一、前言 二、Microsoft Project Online简介 三、Microsoft Project Online的优势 1、云端部署 2、多设备支持...

飞浆AI studio人工智能课程学习(3)-在具体场景下优化Prompt
文章目录 在具体场景下优化Prompt营销场景办公效率场景日常生活场景海报背景图生成办公效率场景预设Prompt 生活场景中日常学习Prompt: 给写完的代码做文档 将优质Prompt模板化Prompt 1:Prompt 1:Prompt 2步骤文本过长而导致遗失信息的示例修改后 特殊示例 如何提升安全性主要目…...

企业工程行业管理系统源码-专业的工程管理软件-提供一站式服务
Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下: 首页 工作台:待办工作、消息通知、预警信息,点击可进入相应的列表 项目进度图表:选择(总体或单个)项目显示1…...
Ehcache 整合Spring 使用页面、对象缓存
Ehcache在很多项目中都出现过,用法也比较简单。一般的加些配置就可以了,而且Ehcache可以对页面、对象、数据进行缓存,同时支持集群/分布式缓存。如果整合Spring、Hibernate也非常的简单,Spring对Ehcache的支持也非常好。EHCache支…...
Spring Cloud中的服务路由与负载均衡
Spring Cloud中的服务路由与负载均衡 一、服务路由1. 服务发现2. 服务注册3. 服务消费4. 服务提供5. 服务路由实现 二、负载均衡1. 负载均衡的概念2. 负载均衡算法3. 负载均衡实现4. 负载均衡策略5. 使用Spring Cloud实现负载均衡 三、服务路由与负载均衡的集成1. 集成背景2. 集…...

rails routes的使用
Rails routes 是用于确定应该将请求发送到哪个控制器和操作的一种机制。在 Rails 应用程序中,可以通过定义路由来映射 URL 到控制器操作。可以使用 rails routes 命令查看当前应用程序中定义的所有路由。 以下是一些常见的用法: 查看所有路由ÿ…...

Linux基础内容(21)—— 进程消息队列和信号量
Linux基础内容(20)—— 共享内存_哈里沃克的博客-CSDN博客 目录 1.消息队列 1.定义 2.操作 2.信号量 1.定义 2.细节 3.延申 4.操作 3.IPC的特点共性 1.消息队列 1.定义 定义:是操作系统提供的内核级队列 2.操作 msgget:…...

STM32实现基于RS485的简单的Modbus协议
背景 我这里用STM32实现,其实可以搬移到其他MCU,之前有项目使用STM32实现Modbus协议 这个场景比较正常,很多时候都能碰到 这里主要是Modbus和变频器通信 最常见的是使用Modbus实现传感器数据的采集,我记得之前用过一些传感器都…...

wordpress后台更新后 前端没变化的解决方法
使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...

Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础
第三周 Day 3 🎯 今日目标 理解类(class)和对象(object)的关系学会定义类的属性、方法和构造函数(init)掌握对象的创建与使用初识封装、继承和多态的基本概念(预告) &a…...

华为云Flexus+DeepSeek征文 | 基于Dify构建具备联网搜索能力的知识库问答助手
华为云FlexusDeepSeek征文 | 基于Dify构建具备联网搜索能力的知识库问答助手 一、构建知识库问答助手引言二、构建知识库问答助手环境2.1 基于FlexusX实例的Dify平台2.2 基于MaaS的模型API商用服务 三、构建知识库问答助手实战3.1 配置Dify环境3.2 创建知识库问答助手3.3 使用知…...

联邦学习带宽资源分配
带宽资源分配是指在网络中如何合理分配有限的带宽资源,以满足各个通信任务和用户的需求,尤其是在多用户共享带宽的情况下,如何确保各个设备或用户的通信需求得到高效且公平的满足。带宽是网络中的一个重要资源,通常指的是单位时间…...

持续交付的进化:从DevOps到AI驱动的IT新动能
文章目录 一、持续交付的本质:从手动到自动的交付飞跃关键特性案例:电商平台的高效部署 二、持续交付的演进:从CI到AI驱动的未来发展历程 中国…...