rabbitmq入门学习
写在前面
本文看下rabbit mq的基础概念以及使用。
1:简单介绍
为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的,即只能通过Java语言来实现和使用,与jms类似还有基于net的nms,这也是一套规范接口,只不过是基于.net开发语言的。不管是jms还是nms,它们都有一个通病,就是无法实现跨语言,这个时候amqp就应用而生了,可以将其理解为一种应用层的协议,构建在tcp之上,因此就可以实现跨语言的消息通信,参考下图:

amqp协议通信模型如下:

2:基础环境准备
2.1:服务安装
参考docker安装rabbitmq 。
2.2:创建Virtual host和用户
- 创建virtual host

- 创建admin用户

- 设置admin权限

添加成功:

3:正戏
本文主要看其提供的5种队列,如下图:

3.1:简单队列
简单队列就是一个生产者一个消费者的队列方式,如下图:

- 生产消息:
public class SimpleSend {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World come here!!!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
生产后:

点击q_test_01:

- 消费消息
public class SimpleRecv {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列 true 这里使用自动确认,及消息消费默认消费成功,这种方式效率高,但是容易丢失消息// 如果某些场景允许部分消息丢失,但是要求执行效率,则可以考虑将该值设置为true,否则设置为false,即手动确认// 最周通过执行channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完成手动确认,多一次网络通信channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}}
}
输出:
[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[x] Received 'Hello World come here!!!'

3.2:work队列
一个生产者多个消费者,如下图:

work队列看起来和简单队列相比只是多起了几个消费者而已。
- 生产者
public class WorkSend {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {// 消息内容String message = "" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();}
}
- 消费者
public class WorkRecv {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发一条消息给消费者// 该值设置为1,配合手动确认,则可以实现一条一条消费,确认一条后消费下一条,且多个消费者时谁的消费能力强,谁消费的消息多// 消费者之间消费消息不相互影响channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示自动channel.basicConsume(QUEUE_NAME, false, consumer);
// channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [y] Received '" + message + "'");//休眠Thread.sleep(10);// 返回确认状态,注释掉表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
3.3:订阅模式
这种这种模式可以实现一个消息同时被多个消费者消费(广播),但是具体的实现需要依赖于交换器exchange,生产者端将消息发送到交换器,之后消费者端只需要将某个消息队列绑定到交换器,交换器会将消息发送到绑定的所有队列,消费者端就可以从队列中获取到对应的消息,但需要注意一个队列的消息还是只可以获取一次,如下图们:

- 生产者端
public class SubscribeSend {private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
消费者端2个:
public class SubscribeRecv {private final static String QUEUE_NAME = "test_queue_work1";private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class SubscribeRecv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv2] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
-
启动2个消费者

-
启动生产者

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv2] Received 'Hello World!'[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv] Received 'Hello World!'
接着看下转换器和队列的绑定关系:

注意以下的问题:
1:因为交换器本身不具备数据存储的能力,所以如果是某个交换器上没有绑定任何的队列,则该消息将会丢失。
2:因为交换器本身不存储数据,所以在具体的消息队列绑定到交换器(即消费者启动前),不要生产消息到交换器,否则消息将会丢失。
3.4:路由模式
这种方式类似于订阅模式,也需要转换器作为中间商,但是并不会直接无脑的发送消息,而是会根据消费者额外指定的路由key,生产者在向转换器发送消息时会带着routeKey,消费者在消费消息时会指定自己期望的routeKey只有二者匹配时,才会从队列中消费对应的消息,l另外注意的时交换机类型设置为direct,如下图:

- 生产者
public class RoutingSend {private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange 设置交换器为direct,即路由模式channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";// 设置路由key为"update",即只有对应队列上设置了update路由key的消费者才会消费到消息channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
- 2个消费者
public class RoutingRecv {private final static String QUEUE_NAME = "test_queue_work_route";private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机 设置路由key为delete,updatechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class RoutingRecv2 {private final static String QUEUE_NAME = "test_queue_work_route2";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


先启动消费者再启动生产者测试可。

写在后面
参考文章列表
RabbitMQ使用教程(超详细)
docker安装rabbitmq
RabbitMq的一些概念,JMS、AMQP、MQ 。
相关文章:
rabbitmq入门学习
写在前面 本文看下rabbit mq的基础概念以及使用。 1:简单介绍 为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的&…...
说说对Fiber架构的理解?解决了什么问题?
一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的,当其中一个线程执行时,另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程,那么渲染层面的更新就不得不长时间地等待,界面长时间不更新,会导…...
Spring Security笔记
Spring Security 是 Spring家族中的一个安全管理框架。 一般来说中大型的项目都是使用 SpringSecurity 来做安全框架,小项目用相对简单的Shiro。认证、授权是 SpringSecurity 作为安全框架的核心功能。 认证:通过用户名密码验证当前访问系统的是不是本…...
快速教程|如何在 AWS EC2上使用 Walrus 部署 GitLab
Walrus 是一款基于平台工程理念的开源应用管理平台,致力于解决应用交付领域的深切痛点。借助 Walrus 将云原生的能力和最佳实践扩展到非容器化环境,并支持任意应用形态统一编排部署,降低使用基础设施的复杂度,为研发和运维团队提供…...
[vmware]vmware虚拟机压缩空间清理空间
vmware中的ubuntu使用如果拷贝文件进去在删除,vmare镜像文件并不会减少日积月累会不断是的真实物理磁盘空间大幅度减少,比如我以前windows操作系统本来只有30GB最后居然占道硬盘200GB,清理方法有2种。 第一种:vmware界面操作 第二…...
一篇文章带你使用(MMKV--基于 mmap 的高性能通用 key-value 组件)
一、MMKV是什么? MMKV 是基于 mmap 内存映射的 key-value 组件,底层序列化/反序列化使用 protobuf 实现,性能高,稳定性强。也是腾讯微信团队使用的技术。 支持的数据类型 支持以下 Java 语言基础类型: boolean、int…...
Pytorch 里面torch.no_grad 和model.eval(), model.train() 的作用
torch.no_grad: 影响模型的自微分器,使得其停止工作;这样的话,数据计算的数据就会变快,内存占用也会变小,因为没有了反向梯度计算,当然,我哦们也无法做反向传播。 model.eval() 和model.train()…...
Ozon产品内容评级功能上线,妙手ERP实力助力Ozon卖家全方位打造爆款产品!
产品内容评级,可以直接反映产品质量的高低,也是影响产品排名的关键。具有较高内容评级的产品,将有更大机会显示在搜索结果和类目的前几页中,从而引起买家的关注,促进销售。 为帮助卖家打造高质量产品,妙手…...
Linux 下最主流的文件系统格式——ext
硬盘分成相同大小的单元,我们称为块(Block)。一块的大小是扇区大小的整数倍,默认是 4K。在格式化的时候,这个值是可以设定的。 一大块硬盘被分成了一个个小的块,用来存放文件的数据部分。这样一来…...
变量环境、变量提升和暂时性死区
JavaScript中的提升 在JavaScript中,“Hoisting”(提升)是一种特性,它将变量和函数的声明移动到作用域的顶部。这意味着可以在声明之前使用这些变量和函数,而不会报错。 当JavaScript代码执行时,会经过两个…...
yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)
多目标追踪实例分割目标检测 YOLO (You Only Look Once) 是一个流行的目标检测算法,它能够在图像中准确地定位和识别多个物体。 本项目是基于 YOLO 算法的目标跟踪系统,它将 YOLO 的目标检测功能与目标跟踪技术相结合,实现了实时的多目标跟…...
【神经网络】【GoogleNet】
1、引言 卷积神经网络是当前最热门的技术,我想深入地学习这门技术,从他的发展历史开始,了解神经网络算法的兴衰起伏;同时了解他在发展过程中的**里程碑式算法**,能更好的把握神经网络发展的未来趋势,了解神…...
网络安全深入学习第八课——正向代理(工具:ReGeorg)
文章目录 一、环境配置二、开始模拟1、拿下跳板机的Webshell权限,并上传shell文件1.1、查看跳板机网络环境1.2、查看arp表 2、使用ReGeorg来建立连接2.1、生产ReGeorg隧道文件2.2、上传ReGeorg隧道的PHP脚本到跳板机2.3、连接隧道2.4、尝试浏览器连接 3、使用Proxif…...
Jmeter全流程性能测试实战
项目背景: 我们的平台为全国某行业监控平台,经过3轮功能测试、接口测试后,98%的问题已经关闭,决定对省平台向全国平台上传数据的接口进行性能测试。 01、测试步骤 1、编写性能测试方案 由于我是刚进入此项目组不久,…...
Python算法例8 将整数A转换为B
1. 问题描述 给定整数A和B,求出将整数A转换为B,需要改变bit的位数。 2. 问题示例 把31转换为14,需要改变2个bit位,即:(31)10(11111)2,(14&…...
一个基于百度飞桨封装的.NET版本OCR工具类库 - PaddleOCRSharp
前言 大家有使用过.NET开发过OCR工具吗?今天给大家推荐一个基于百度飞桨封装的.NET版本OCR工具类库:PaddleOCRSharp。 OCR工具有什么用? OCR(Optical Character Recognition)工具可以将图像或扫描文件中的文本内容转…...
在 CelebA 数据集上训练的 PyTorch 中的基本变分自动编码器
摩西西珀博士 一、说明 我最近发现自己需要一种方法将图像编码到潜在嵌入中,调整嵌入,然后生成新图像。有一些强大的方法可以创建嵌入或从嵌入生成。如果你想同时做到这两点,一种自然且相当简单的方法是使用变分自动编码器。 这样的深度网络不…...
利用Ansible实现批量Linux服务器安全配置
1.摘要 在上一篇<<初步利用Ansible实现批量服务器自动化管理>>文章中, 我初步实现了通过编写清单和剧本来实现多台服务器的自动化管理,在本章节中, 我将利用Ansible的剧本来实现更实用、更复杂一点的功能, 主要功能包括三个:1.同时在三台服务器中增加IP访问控制,只…...
读书笔记:彼得·德鲁克《认识管理》第8章 战略规划:企业家技能
一、章节内容概述 战略规划帮助做好当前的业务以迎接未来。战略规划需要思考业务应该是什么,当前必须做什么才能赢得未来。战略规划需要进行风险决策,需要有组织地抛弃过去的业务,要求清晰界定和明确安排为实现理想的未来而开展的工作。战略…...
HarmonyOS应用开发-视频播放器与弹窗
Viedo组件 在手机、平板或是智慧屏这些终端设备上,媒体功能可以算作是我们最常用的场景之一。无论是实现音频的播放、录制、采集,还是视频的播放、切换、循环,亦或是相机的预览、拍照等功能,媒体组件都是必不可少的。以视频功能为…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)
安全领域各种资源,学习文档,以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具,欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...
