rabbitmq入门学习
写在前面
本文看下rabbit mq的基础概念以及使用。
1:简单介绍
为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的,即只能通过Java语言来实现和使用,与jms类似还有基于net的nms,这也是一套规范接口,只不过是基于.net开发语言的。不管是jms还是nms,它们都有一个通病,就是无法实现跨语言,这个时候amqp就应用而生了,可以将其理解为一种应用层的协议,构建在tcp之上,因此就可以实现跨语言的消息通信,参考下图:

amqp协议通信模型如下:

2:基础环境准备
2.1:服务安装
参考docker安装rabbitmq 。
2.2:创建Virtual host和用户
- 创建virtual host

- 创建admin用户

- 设置admin权限

添加成功:

3:正戏
本文主要看其提供的5种队列,如下图:

3.1:简单队列
简单队列就是一个生产者一个消费者的队列方式,如下图:

- 生产消息:
public class SimpleSend {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World come here!!!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
生产后:

点击q_test_01:

- 消费消息
public class SimpleRecv {private final static String QUEUE_NAME = "q_test_01";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列 true 这里使用自动确认,及消息消费默认消费成功,这种方式效率高,但是容易丢失消息// 如果某些场景允许部分消息丢失,但是要求执行效率,则可以考虑将该值设置为true,否则设置为false,即手动确认// 最周通过执行channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完成手动确认,多一次网络通信channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}}
}
输出:
[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[x] Received 'Hello World come here!!!'

3.2:work队列
一个生产者多个消费者,如下图:

work队列看起来和简单队列相比只是多起了几个消费者而已。
- 生产者
public class WorkSend {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {// 消息内容String message = "" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();}
}
- 消费者
public class WorkRecv {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发一条消息给消费者// 该值设置为1,配合手动确认,则可以实现一条一条消费,确认一条后消费下一条,且多个消费者时谁的消费能力强,谁消费的消息多// 消费者之间消费消息不相互影响channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示自动channel.basicConsume(QUEUE_NAME, false, consumer);
// channel.basicConsume(QUEUE_NAME, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [y] Received '" + message + "'");//休眠Thread.sleep(10);// 返回确认状态,注释掉表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
3.3:订阅模式
这种这种模式可以实现一个消息同时被多个消费者消费(广播),但是具体的实现需要依赖于交换器exchange,生产者端将消息发送到交换器,之后消费者端只需要将某个消息队列绑定到交换器,交换器会将消息发送到绑定的所有队列,消费者端就可以从队列中获取到对应的消息,但需要注意一个队列的消息还是只可以获取一次,如下图们:

- 生产者端
public class SubscribeSend {private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
消费者端2个:
public class SubscribeRecv {private final static String QUEUE_NAME = "test_queue_work1";private final static String EXCHANGE_NAME = "test_exchange_fanout111";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class SubscribeRecv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv2] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
-
启动2个消费者

-
启动生产者

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv2] Received 'Hello World!'[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---[Recv] Received 'Hello World!'
接着看下转换器和队列的绑定关系:

注意以下的问题:
1:因为交换器本身不具备数据存储的能力,所以如果是某个交换器上没有绑定任何的队列,则该消息将会丢失。
2:因为交换器本身不存储数据,所以在具体的消息队列绑定到交换器(即消费者启动前),不要生产消息到交换器,否则消息将会丢失。
3.4:路由模式
这种方式类似于订阅模式,也需要转换器作为中间商,但是并不会直接无脑的发送消息,而是会根据消费者额外指定的路由key,生产者在向转换器发送消息时会带着routeKey,消费者在消费消息时会指定自己期望的routeKey只有二者匹配时,才会从队列中消费对应的消息,l另外注意的时交换机类型设置为direct,如下图:

- 生产者
public class RoutingSend {private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange 设置交换器为direct,即路由模式channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";// 设置路由key为"update",即只有对应队列上设置了update路由key的消费者才会消费到消息channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
- 2个消费者
public class RoutingRecv {private final static String QUEUE_NAME = "test_queue_work_route";private final static String EXCHANGE_NAME = "test_exchange_direct123";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机 设置路由key为delete,updatechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}public class RoutingRecv2 {private final static String QUEUE_NAME = "test_queue_work_route2";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


先启动消费者再启动生产者测试可。

写在后面
参考文章列表
RabbitMQ使用教程(超详细)
docker安装rabbitmq
RabbitMq的一些概念,JMS、AMQP、MQ 。
相关文章:
rabbitmq入门学习
写在前面 本文看下rabbit mq的基础概念以及使用。 1:简单介绍 为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的&…...
说说对Fiber架构的理解?解决了什么问题?
一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的,当其中一个线程执行时,另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程,那么渲染层面的更新就不得不长时间地等待,界面长时间不更新,会导…...
Spring Security笔记
Spring Security 是 Spring家族中的一个安全管理框架。 一般来说中大型的项目都是使用 SpringSecurity 来做安全框架,小项目用相对简单的Shiro。认证、授权是 SpringSecurity 作为安全框架的核心功能。 认证:通过用户名密码验证当前访问系统的是不是本…...
快速教程|如何在 AWS EC2上使用 Walrus 部署 GitLab
Walrus 是一款基于平台工程理念的开源应用管理平台,致力于解决应用交付领域的深切痛点。借助 Walrus 将云原生的能力和最佳实践扩展到非容器化环境,并支持任意应用形态统一编排部署,降低使用基础设施的复杂度,为研发和运维团队提供…...
[vmware]vmware虚拟机压缩空间清理空间
vmware中的ubuntu使用如果拷贝文件进去在删除,vmare镜像文件并不会减少日积月累会不断是的真实物理磁盘空间大幅度减少,比如我以前windows操作系统本来只有30GB最后居然占道硬盘200GB,清理方法有2种。 第一种:vmware界面操作 第二…...
一篇文章带你使用(MMKV--基于 mmap 的高性能通用 key-value 组件)
一、MMKV是什么? MMKV 是基于 mmap 内存映射的 key-value 组件,底层序列化/反序列化使用 protobuf 实现,性能高,稳定性强。也是腾讯微信团队使用的技术。 支持的数据类型 支持以下 Java 语言基础类型: boolean、int…...
Pytorch 里面torch.no_grad 和model.eval(), model.train() 的作用
torch.no_grad: 影响模型的自微分器,使得其停止工作;这样的话,数据计算的数据就会变快,内存占用也会变小,因为没有了反向梯度计算,当然,我哦们也无法做反向传播。 model.eval() 和model.train()…...
Ozon产品内容评级功能上线,妙手ERP实力助力Ozon卖家全方位打造爆款产品!
产品内容评级,可以直接反映产品质量的高低,也是影响产品排名的关键。具有较高内容评级的产品,将有更大机会显示在搜索结果和类目的前几页中,从而引起买家的关注,促进销售。 为帮助卖家打造高质量产品,妙手…...
Linux 下最主流的文件系统格式——ext
硬盘分成相同大小的单元,我们称为块(Block)。一块的大小是扇区大小的整数倍,默认是 4K。在格式化的时候,这个值是可以设定的。 一大块硬盘被分成了一个个小的块,用来存放文件的数据部分。这样一来…...
变量环境、变量提升和暂时性死区
JavaScript中的提升 在JavaScript中,“Hoisting”(提升)是一种特性,它将变量和函数的声明移动到作用域的顶部。这意味着可以在声明之前使用这些变量和函数,而不会报错。 当JavaScript代码执行时,会经过两个…...
yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)
多目标追踪实例分割目标检测 YOLO (You Only Look Once) 是一个流行的目标检测算法,它能够在图像中准确地定位和识别多个物体。 本项目是基于 YOLO 算法的目标跟踪系统,它将 YOLO 的目标检测功能与目标跟踪技术相结合,实现了实时的多目标跟…...
【神经网络】【GoogleNet】
1、引言 卷积神经网络是当前最热门的技术,我想深入地学习这门技术,从他的发展历史开始,了解神经网络算法的兴衰起伏;同时了解他在发展过程中的**里程碑式算法**,能更好的把握神经网络发展的未来趋势,了解神…...
网络安全深入学习第八课——正向代理(工具:ReGeorg)
文章目录 一、环境配置二、开始模拟1、拿下跳板机的Webshell权限,并上传shell文件1.1、查看跳板机网络环境1.2、查看arp表 2、使用ReGeorg来建立连接2.1、生产ReGeorg隧道文件2.2、上传ReGeorg隧道的PHP脚本到跳板机2.3、连接隧道2.4、尝试浏览器连接 3、使用Proxif…...
Jmeter全流程性能测试实战
项目背景: 我们的平台为全国某行业监控平台,经过3轮功能测试、接口测试后,98%的问题已经关闭,决定对省平台向全国平台上传数据的接口进行性能测试。 01、测试步骤 1、编写性能测试方案 由于我是刚进入此项目组不久,…...
Python算法例8 将整数A转换为B
1. 问题描述 给定整数A和B,求出将整数A转换为B,需要改变bit的位数。 2. 问题示例 把31转换为14,需要改变2个bit位,即:(31)10(11111)2,(14&…...
一个基于百度飞桨封装的.NET版本OCR工具类库 - PaddleOCRSharp
前言 大家有使用过.NET开发过OCR工具吗?今天给大家推荐一个基于百度飞桨封装的.NET版本OCR工具类库:PaddleOCRSharp。 OCR工具有什么用? OCR(Optical Character Recognition)工具可以将图像或扫描文件中的文本内容转…...
在 CelebA 数据集上训练的 PyTorch 中的基本变分自动编码器
摩西西珀博士 一、说明 我最近发现自己需要一种方法将图像编码到潜在嵌入中,调整嵌入,然后生成新图像。有一些强大的方法可以创建嵌入或从嵌入生成。如果你想同时做到这两点,一种自然且相当简单的方法是使用变分自动编码器。 这样的深度网络不…...
利用Ansible实现批量Linux服务器安全配置
1.摘要 在上一篇<<初步利用Ansible实现批量服务器自动化管理>>文章中, 我初步实现了通过编写清单和剧本来实现多台服务器的自动化管理,在本章节中, 我将利用Ansible的剧本来实现更实用、更复杂一点的功能, 主要功能包括三个:1.同时在三台服务器中增加IP访问控制,只…...
读书笔记:彼得·德鲁克《认识管理》第8章 战略规划:企业家技能
一、章节内容概述 战略规划帮助做好当前的业务以迎接未来。战略规划需要思考业务应该是什么,当前必须做什么才能赢得未来。战略规划需要进行风险决策,需要有组织地抛弃过去的业务,要求清晰界定和明确安排为实现理想的未来而开展的工作。战略…...
HarmonyOS应用开发-视频播放器与弹窗
Viedo组件 在手机、平板或是智慧屏这些终端设备上,媒体功能可以算作是我们最常用的场景之一。无论是实现音频的播放、录制、采集,还是视频的播放、切换、循环,亦或是相机的预览、拍照等功能,媒体组件都是必不可少的。以视频功能为…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
python执行测试用例,allure报乱码且未成功生成报告
allure执行测试用例时显示乱码:‘allure’ �����ڲ����ⲿ���Ҳ���ǿ�&am…...
安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)
船舶制造装配管理现状:装配工作依赖人工经验,装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书,但在实际执行中,工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...
深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用
文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么?1.1.2 感知机的工作原理 1.2 感知机的简单应用:基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...
MySQL:分区的基本使用
目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...
wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
stm32wle5 lpuart DMA数据不接收
配置波特率9600时,需要使用外部低速晶振...
嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)
目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 编辑编辑 UDP的特征 socke函数 bind函数 recvfrom函数(接收函数) sendto函数(发送函数) 五、网络编程之 UDP 用…...
