Docker启动RabbitMQ,实现生产者与消费者
目录
一、Docker拉取镜像并启动RabbitMQ
二、Hello World
(一)依赖导入
(二)消息生产者
(三)消息消费者
三、实现轮训分发消息
(一)抽取工具类
(二)启动两个工作线程
(三)启动发送线程
四、实现手动应答
(一)消息应答概念
(二)消息应答的方法
(三)消息自动重新入队
(四)消息手动应答代码
1、生产者
2、睡眠工具类模拟业务执行
3、消费者
一、Docker拉取镜像并启动RabbitMQ
拉取镜像
docker pull rabbitmq:3.8.8-management
查看镜像
docker images rabbitmq
启动镜像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台

用户名密码默认为guest

二、Hello World
(一)依赖导入
<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>
(二)消息生产者
工作原理

- Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)
public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置主机ipfactory.setHost("182.92.234.71");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭Connection connection = factory.newConnection();// 获取信道Channel channel = connection.createChannel();/** 生成一个队列* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数**/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello rabbitmq";/** 发送一个消息* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 1.发送到哪个交换机* 2.路由的key是哪个* 3.其他的参数信息* 4.发送消息的消息体***/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送成功");}
}
(三)消息消费者
public class Consumer {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置主机ipfactory.setHost("182.92.234.71");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭Connection connection = factory.newConnection();// 获取信道Channel channel = connection.createChannel();// 推送的消息如何进行消费的回调接口DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};// 取消消费的一个回调接口,如在消费的时候队列被删除了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/** 消费者消费消息* basicConsume(String queue, boolean autoAck, * DeliverCallback deliverCallback, CancelCallback cancelCallback)* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调**/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
三、实现轮训分发消息
(一)抽取工具类
可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化
public class RabbitMqUtils {private static Channel channel;static {ConnectionFactory factory = new ConnectionFactory();// 设置ip地址factory.setHost("192.168.23.100");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");try {// 创建连接Connection connection = factory.newConnection();// 获取信道channel = connection.createChannel();} catch (Exception e) {System.out.println("创建信道失败,错误信息:" + e.getMessage());}}public static Channel getChannel() {return channel;}
}
(二)启动两个工作线程
相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程
public class Worker01 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag, message) -> {System.out.println("接受到消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};// 启动两次,第一次为C1, 第二次为C2System.out.println("C2消费者等待消费消息");channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);}
}
(三)启动发送线程
public class Test01 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 通过控制台输入充当消息,使轮训演示更明显Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );System.out.println("消息发送完成:" + message);}}
}
结果

四、实现手动应答
(一)消息应答概念
自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除
(二)消息应答的方法
- Channel.basicAck(用于肯定确认)
- RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认)
- 与 Channel.basicNack 相比少一个参数Multiple
- multiple 的 true 和 false 代表不同意思 true 代表批量应答 channel 上未应答的消息比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
- multiple 的 true 和 false 代表不同意思
- 不处理该消息了直接拒绝,可以将其丢弃了
- 与 Channel.basicNack 相比少一个参数Multiple

(三)消息自动重新入队
(四)消息手动应答代码
1、生产者
public class Test01 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );System.out.println("消息发送完成:" + message);}}
}
2、睡眠工具类模拟业务执行
public class SleepUtils {public static void sleep(int second) {try {Thread.sleep(1000 * second);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}
}
3、消费者
public class Worker01 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws Exception {System.out.println("C1,业务时间短");Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag, message) -> {SleepUtils.sleep(1); // 模拟业务执行1秒System.out.println("接受到消息:" + new String(message.getBody()));/** 1、消息标识* 2、是否启动批量确认,false:否。* 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息* 时出现异常会导致该消息的丢失*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);}
}==============================================================================
public class Worker02 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws Exception {System.out.println("C2,业务时间长");Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag, message) -> {SleepUtils.sleep(15); // 模拟业务执行15秒System.out.println("接受到消息:" + new String(message.getBody()));/** 1、消息标识* 2、是否启动批量确认,false:否。* 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息* 时出现异常会导致该消息的丢失*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);}
}
worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。
注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错

最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。
相关文章:
Docker启动RabbitMQ,实现生产者与消费者
目录 一、Docker拉取镜像并启动RabbitMQ 二、Hello World (一)依赖导入 (二)消息生产者 (三)消息消费者 三、实现轮训分发消息 (一)抽取工具类 (二)启…...
【C语言】函数栈帧的创建与销毁
Yan-英杰的主页 悟已往之不谏 知来者之可追 目录 0.ebp和esp是如何来维护栈帧的呢? 1.为什么局部变量的值不初始化是随机的? 2.局部变量是怎么创建的? 3 .函数是如何传参的?传参的顺序是怎样的 4.函数是如何调用的 …...
【Git】使用Git上传项目到远程仓库Gitee码云步骤详解
电脑里存放了很多项目,有的备份,有的没备份,如果不仔细分类管理的话,时间一长,到时看到那就会觉得非常杂乱,很难整理,这里有一个叫源代码托管,用过它的都知道,方便管理和…...
Head First设计模式---3.装饰者模式
3.1装饰者模式 亦称: 装饰者模式、装饰器模式、Wrapper、Decorator 装饰模式是一种结构型设计模式, 允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为。 举个例子:天气很冷,我们一件一件穿衣服,…...
Python 算法交易实验48 表字段设计
说明 虽然说的是表,实际上用的是Mongo集合 基于ADBS(APIFunc DataBase Service)可以构造一个供后续研究、生产长时间使用的数据基础,这个基础包括了: 1 队列服务。通过队列,数据可以通过API实现零担和批量两种模式的快速存储。2 …...
库存管理系统-课后程序(JAVA基础案例教程-黑马程序员编著-第六章-课后作业)
【案例6-1】 库存管理系统 【案例介绍】 1.任务描述 像商城和超市这样的地方,都需要有自己的库房,并且库房商品的库存变化有专人记录,这样才能保证商城和超市正常运转。 本例要求编写一个程序,模拟库存管理系统。该系统主要包…...
【极海APM32替代笔记】HAL库低功耗STOP停止模式的串口唤醒(解决进入以后立马唤醒、串口唤醒和回调无法一起使用、接收数据不全的问题)
【极海APM32替代笔记】HAL库低功耗STOP停止模式的串口唤醒(解决进入以后立马唤醒、串口唤醒和回调无法一起使用、接收数据不全的问题) 【STM32笔记】低功耗模式配置及避坑汇总 前文: blog.csdn.net/weixin_53403301/article/details/128216…...
Python类变量和实例变量(类属性和实例属性)
无论是类属性还是类方法,都无法像普通变量或者函数那样,在类的外部直接使用它们。我们可以将类看做一个独立的空间,则类属性其实就是在类体中定义的变量,类方法是在类体中定义的函数。 在类体中,根据变量定义的位置不…...
Glide加载图片
使用Glide加载图片,默认情况下在内存中缓存该图片。这样的情况下如果我们保存头像在某个路径,当再次更换头像时可能由于缓存问题,UI上更新的不及时。 默认加载图片方式: Glide.with(context).load(coverPath).error(R.drawable.a…...
有关时间复杂度和空间复杂度的练习
目录 一、消失的数字 二、轮转数组 三、 单选题 一、消失的数字 数组nums包含从0到n的所有整数,但其中缺了一个。请编写代码找出那个缺失的整数。你有办法在 O(n) 时间内完成吗? 注意:本题相对书上原题稍作改动 示例 1: 输入…...
linux服务器nfs数据挂载
参考:https://blog.csdn.net/qq_43721935/article/details/119889841?from_wecom1 1、NFS 介绍 NFS 即网络文件系统(Network File-System),可以通过网络让不同机器、不同系统之间可以实现文件共享。通过 NFS,可以访问…...
Python 自动化测试必会技能板块—unittest框架
说到 Python 的单元测试框架,想必接触过 Python 的朋友脑袋里第一个想到的就是 unittest。的确,作为 Python 的标准库,它很优秀,并被广泛应用于各个项目。但其实在 Python 众多项目中,主流的单元测试框架远不止这一个。…...
mysql存储引擎、事务、索引
目录MySQL进阶存储引擎什么是存储引擎常用存储引擎事务什么是事务怎么理解提交事务 和回滚事务事务特性事务的隔离级别索引什么是索引索引的实现原理什么条件下,我们会考虑给字段添加索引呢?什么条件下,索引会失效?索引分类MySQL进阶 存储引…...
毕业论文图片格式、分辨率选择及高质量Word转PDF方法
已知1:毕业论文盲评通常需要提交PDF文件。 已知2:PDF文件太大可能会导致翻页卡顿以及上传盲评网站失败。 已知3:Word转PDF方法不当可能会导致图像模糊。 已知4:打印机分辨率通常为300dpi。 问题1:论文插图分辨率设置…...
华为外包测试2年,不甘被替换,168天的学习转岗成正式员工
我25岁的时候,华为外包测试,薪资13.5k,人在深圳。 内卷什么的就不说了,而且人在外包那些高级精英年薪大几十的咱也接触不到,就说说外包吧。假设以我为界限,25岁一线城市13.5k,那22-24大部分情况…...
简单的C++:【运算符重载】新手易学
学过C语言的同志们应该都知道位运算符>> 和 << (右移左移),但是这两个运算符在C中还是我们的输入和输出流操作符,那么这是为什么呢?,了解完本篇文章之后,我们再来回答这个问题。 C为…...
NPE:记一次脑残NPE的排查过程
目录 碎碎念: 如下这行报NPE: 排查过程: 解解方案: 小结: 空指针出现的几种情况: 如何从根源避免空指针: 赋值时自动拆箱出现空指针: 1、变量赋值自动拆箱出现的空指针 2、…...
canvas样式与颜色,字体,图片,状态,形变
色彩 fillStyle color 设置图形的填充颜色。 strokeStyle color 设置图形轮廓的颜色。 备注: 一旦您设置了 strokeStyle 或者 fillStyle 的值,那么这个新值就会成为新绘制的图形的默认值。如果你要给每个图形上不同的颜色,你需要重新设置…...
重识html
html 重识html 万维网用url统一资源定位符标识分布因特网上的各种文档 各种概念 URL: 统一资源定位器 它是WWW的统一资源定位标志,就是指网络地址 在WWW上,每一信息资源都有统一的且在网上唯一的地址 网页: 由文字 图片 视频 音乐各种元素排列组…...
Redis:缓存一致性问题(缓存更新策略)
Redis缓存的一致性1. 缓存1.1 缓存的作用:1.2 缓存的成本:2. 缓存模型3. 缓存一致性问题3.1 引入3.2 解决(1) 先更新数据库,再手动删除缓存(2) 使用事务保证原子性(3) 以Redis中的TTL为兜底3.3 案例:商铺信息查询和更新(1) 查询商…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
iview框架主题色的应用
1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题,无需引入,直接可…...
比较数据迁移后MySQL数据库和OceanBase数据仓库中的表
设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...
