RabbitMQ日常使用小结
一、使用场景
削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。
二、组成及工作流程
1.主要组成
Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。 VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。 Queue:消息队列。
2.工作流程
生产者发送消息流程: 1、和Broker建立TCP连接。 2、和Broker建立通道。 3、通过通道消息发送给Broker,由Exchange将消息进行转发。 4、Exchange将消息转发到指定的Queue(队列)。消费者接收消息流程: 1、和Broker建立TCP连接 2、和Broker建立通道 3、监听指定的Queue(队列) 4、当有消息到达Queue时Broker默认将消息推送给消费者。 5、接收到消息。 6、ack回复。
三、交换机Exchange(默认direct)
交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
1.交换机种类
Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。
fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。
topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。
headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。
2.交换机属性
Name:交换机名称 Durability:持久化标志,表明此交换机是否是持久化的 Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除 Arguments:依赖代理本身。
3.交换机状态
持久(durable)
暂存(transient)
4.消息确认机制(ACK)
自动ACK:消息一旦被接收,消费者自动发送ACK。 手动ACK:消息接收后,不会发送ACK,需要手动调用。
四、rabbitmq 客户端的使用
1.引入依赖
<!-- rabbitmq 客户端依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>
2.创建连接工具
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyRabbitMQUtils {public static Connection getConnel() throws Exception{//1 创建 ConnectionFactoryConnectionFactory factory = new ConnectionFactory() ;factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqfactory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection = factory.newConnection();// Channel channel = connection.createChannel();return connection;}
}
3.生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 交换机名称private final static String EXCHANGE_NAME = "simple_exchange";// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[]args) throws Exception{Connection produceConnection = MyRabbitMQUtils.getConnel();Channel produceChannel = produceConnection.createChannel();// 建立交换机(广播)produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);/** 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<10;i++){String message = "生产者发布的消息---!";message = message+i;produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println(" Producer 发布'" + message + "'");}//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)produceChannel.close();produceConnection.close();}
}
4.消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Comsumer {// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {Connection comsumerConnection = MyRabbitMQUtils.getConnel();Channel comsumerChannel = comsumerConnection.createChannel();/** 参数明细* 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){/** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("Comsumer 获得: " + msg + "!");// 手动 ACKcomsumerChannel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列,第二个参数:是否自动进行消息确认。//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。* 3、callback,消费方法,当消费者接收到消息要执行的方法。*/comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);}
}
五、Spring中使用RabbitMQ
1.引入依赖
<!-- AMQP 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.7.RELEASE</version></dependency><!--springboot测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.5.6</version></dependency>
2.更改配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
3.把交换机、和队列加入IOC容器中
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// email队列public static final String QUEUE_EMAIL = "queue_email";// sms队列public static final String QUEUE_SMS = "queue_sms";// topics类型交换机public static final String EXCHANGE_NAME="topic.exchange";public static final String ROUTINGKEY_EMAIL="topic.#.email.#";public static final String ROUTINGKEY_SMS="topic.#.sms.#";//声明交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/*** new Queue(QUEUE_EMAIL,true,false,false)* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false* exclusive 表示该消息队列是否只在当前connection生效,默认是false*/@Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列@Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}
4.模拟业务发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class Send {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMsgByTopics(){/*** 参数:* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i=0;i<5;i++){String message = "恭喜您,注册成功!userid="+i;rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);System.out.println(" [x] Sent '" + message + "'");}}
}
5.消息的监听及处理
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive {//监听邮件队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.email.#","email.*"}))public void rece_email(String msg){System.out.println(" [邮件服务] received : " + msg + "!");}//监听短信队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.sms.#"}))public void rece_sms(String msg){System.out.println(" [短信服务] received : " + msg + "!");}}
相关文章:
RabbitMQ日常使用小结
一、使用场景 削峰、解耦、异步。 基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。 RabbitMQ的基本概念 二、组成及工作流…...
博物馆文物馆藏环境空气质量无线监控系统方案
博物馆文物馆藏环境空气质量无线监控系统方案 博物馆无线环境测控系统 博物馆恒温恒湿消毒净化系统 现代化博物馆空气质量一体化3D可视化管控平台 博物馆温湿度在线监控系统 博物馆光照在线监控系统 博物馆二氧化碳在线监控系统 博物馆在线监控系统 博物馆紫外线在线监控…...
测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类
【原文链接】测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类 一、Bug的严重程度(Severity) Bug的Severity(严重程度)指的是一个Bug对软件系统功能影响的程度&#…...
斯坦福、Nautilus Chain等联合主办的 Hackathon 活动,现已接受报名
由 Stanford Blockchain Accelerator、Zebec Protocol、 Nautilus Chain、Rootz Lab 共同主办的黑客松活动,现已接受优秀项目提交参赛申请。 在加密行业发展早期,密码极客们就始终在对区块链世界基础设施,在发展方向的无限可能性进行探索。而…...
00后卷王,把我们这些老油条卷的辞职信都写好了........
现在的小年轻真的卷得过分了。 前段时间我们公司来了个00年的,工作没两年,跳槽到我们公司起薪18K,都快接近我了。 后来才知道人家是个卷王,从早干到晚就差搬张床到工位睡觉了。 最近和他聊了一次天,原来这位小老弟家…...
JavaEE(系列12) -- 常见锁策略
目录 1. 乐观锁和悲观锁 2. 轻量级锁与重量级锁 3. 自旋锁和挂起等待锁 4. 互斥锁和读写锁 5. 可重入锁与不可重入锁 6. 死锁 6.1 死锁的必要条件 6.2 如何避免死锁 7. 公平锁和非公平锁 8. Synchronized原理及加锁过程 8.1 Synchronized 小结 8.2 加锁工作过程 8.2.1 偏向锁…...
前端nginx接口跨域
前提:vue项目本地接口通过proxy都可使用,但是项目部署在服务器上后发现所有接口出现503如下状况 简而言之:页面部署在域名为https://aa.bb.cc.com/vehicle/#/下,但是我接口需访问的是https:// azz.qqv.com/permission/company/gro…...
【国产虚拟仪器】基于 ZYNQ 的电能质量系统高速数据采集系统设计
随着电网中非线性负荷用户的不断增加 , 电能质量问题日益严重 。 高精度数据采集系统能够为电能质 量分析提供准确的数据支持 , 是解决电能质量问题的关键依据 。 通过对比现有高速采集系统的设计方案 , 主 控电路多以 ARM 微控制器搭配…...
Java前缀和算法
一.什么是前缀和算法 通俗来讲,前缀和算法就是使用一个新数组来储存原数组中前n-1个元素的和(如果新数组的当前元素的下标为n,计算当前元素的值为原数组中从0到n-1下标数组元素的和),可能这样讲起来有点抽象࿰…...
pico 的两个双核相关函数的延时问题
pico高级API函数中, multicore_fifo_pop_timeout_us 和 multicore_fifo_push_timeout_us 的延时参数, 如修改为500微秒以上时,其延时似乎远远超过设定值,其反馈速度似乎被主核的交互所左右 ,而修改为200以下时&#x…...
Doxygen源码分析: QCString类依赖的qstr系列C函数浅析
2023-05-20 17:02:21 ChrisZZ imzhuofoxmailcom Hompage https://github.com/zchrissirhcz 文章目录 1. doxygen 版本2. QCString 类简介3. qstr 系列函数浅析qmemmove()qsnprintfqstrdup()qstrfree()qstrlen()qstrcpy()qstrncpy()qisempty()qstrcmp()qstrncmp()qisspace()qstr…...
华为OD机试之一种字符串压缩表示的解压(Java源码)
一种字符串压缩表示的解压 题目描述 有一种简易压缩算法:针对全部由小写英文字母组成的字符串,将其中连续超过两个相同字母的部分压缩为连续个数加该字母,其他部分保持原样不变。 例如:字符串“aaabbccccd”经过压缩成为字符串“…...
Microsoft Project Online部署方案
目录 一、前言 二、Microsoft Project Online简介 三、Microsoft Project Online的优势 1、云端部署 2、多设备支持...
飞浆AI studio人工智能课程学习(3)-在具体场景下优化Prompt
文章目录 在具体场景下优化Prompt营销场景办公效率场景日常生活场景海报背景图生成办公效率场景预设Prompt 生活场景中日常学习Prompt: 给写完的代码做文档 将优质Prompt模板化Prompt 1:Prompt 1:Prompt 2步骤文本过长而导致遗失信息的示例修改后 特殊示例 如何提升安全性主要目…...
企业工程行业管理系统源码-专业的工程管理软件-提供一站式服务
Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下: 首页 工作台:待办工作、消息通知、预警信息,点击可进入相应的列表 项目进度图表:选择(总体或单个)项目显示1…...
Ehcache 整合Spring 使用页面、对象缓存
Ehcache在很多项目中都出现过,用法也比较简单。一般的加些配置就可以了,而且Ehcache可以对页面、对象、数据进行缓存,同时支持集群/分布式缓存。如果整合Spring、Hibernate也非常的简单,Spring对Ehcache的支持也非常好。EHCache支…...
Spring Cloud中的服务路由与负载均衡
Spring Cloud中的服务路由与负载均衡 一、服务路由1. 服务发现2. 服务注册3. 服务消费4. 服务提供5. 服务路由实现 二、负载均衡1. 负载均衡的概念2. 负载均衡算法3. 负载均衡实现4. 负载均衡策略5. 使用Spring Cloud实现负载均衡 三、服务路由与负载均衡的集成1. 集成背景2. 集…...
rails routes的使用
Rails routes 是用于确定应该将请求发送到哪个控制器和操作的一种机制。在 Rails 应用程序中,可以通过定义路由来映射 URL 到控制器操作。可以使用 rails routes 命令查看当前应用程序中定义的所有路由。 以下是一些常见的用法: 查看所有路由ÿ…...
Linux基础内容(21)—— 进程消息队列和信号量
Linux基础内容(20)—— 共享内存_哈里沃克的博客-CSDN博客 目录 1.消息队列 1.定义 2.操作 2.信号量 1.定义 2.细节 3.延申 4.操作 3.IPC的特点共性 1.消息队列 1.定义 定义:是操作系统提供的内核级队列 2.操作 msgget:…...
STM32实现基于RS485的简单的Modbus协议
背景 我这里用STM32实现,其实可以搬移到其他MCU,之前有项目使用STM32实现Modbus协议 这个场景比较正常,很多时候都能碰到 这里主要是Modbus和变频器通信 最常见的是使用Modbus实现传感器数据的采集,我记得之前用过一些传感器都…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
C++.OpenGL (20/64)混合(Blending)
混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
