RabbitMQ - 简单案例
目录
0.引用
1.Hello world
2.轮训分发消息
2.1 抽取工具类
2.2 启动两个工作线程接受消息
2.4 结果展示
3.消息应答
3.1 自动应答
3.2 手动消息应答的方法
3.3 消息自动重新入队
3.4 消息手动应答代码
4.RabbitMQ 持久化
4.1 队列如何实现持久化
4.2 消息实现持久化
5.不公平分发
6.预取值分发
0.引用
https://note.oddfar.com/rabbitmq/
1.Hello world
1.1 依赖引用
<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
</dependencies>
1.2 消息生产者
package com.example.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生成一个队列* 1.QUEUE_NAME 队列名称* 2.durable 队列里面的消息是否持久化 也就是是否用完就删除* 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/Boolean durable = true;Boolean exclusive = false;Boolean autoDelete = false;Map<String, Object> arguments = null;channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);String message = "hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}}
1.3 消息消费者
package com.example.one;
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息.........");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/*** 消费者消费消息 - 接受消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调* 4.消息被取消时的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}
2.轮训分发消息
2.1 抽取工具类
package com.example.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
2.2 启动两个工作线程接受消息
package com.example.two;import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
选中 Allow multiple instances

启动后

2.3 启动一个发送消息线程
public class Task01 {public static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完成:" + message);}}
}
2.4 结果展示
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
3.消息应答
3.1 自动应答
消息发送后立即被认为已经传送成功
3.2 手动消息应答的方法
- Channel.basicAck(用于肯定确认)
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认)
Multiple 的解释:
手动应答的好处是可以批量应答并且减少网络拥堵

- true 代表批量应答 channel 上未应答的消息
- false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

3.3 消息自动重新入队
3.4 消息手动应答代码
消费者在上面代码的基础上增加了以下内容
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4.RabbitMQ 持久化
4.1 队列如何实现持久化
//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
4.2 消息实现持久化
需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

5.不公平分发
为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1);
//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
6.预取值分发
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的
相关文章:
RabbitMQ - 简单案例
目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…...
《吐血整理》高级系列教程-吃透Fiddler抓包教程(30)-Fiddler如何抓Android7.0以上的Https包-番外篇
1.简介 通过宏哥前边几篇文章的讲解和介绍想必大家都知道android7.0以上,有android的机制不在信任用户证书,导致https协议无法抓包。除非把证书装在系统信任的证书里,此时手机需要root权限。但是大家都知道root手机是非常繁琐的且不安全&…...
服务器被攻击了怎么办?
服务器被攻击是无法避免的,但是我们能通过做好防护措施,提高服务器的安全性,降低被攻击的几率。那么当服务器已经被 攻击了,怎样才能降低损失呢?该怎样补救? 断开网络 全部的攻击都来自于网络,因…...
P1156 垃圾陷阱(背包变形)
垃圾陷阱 题目描述 卡门――农夫约翰极其珍视的一条 Holsteins 奶牛――已经落了到 “垃圾井” 中。“垃圾井” 是农夫们扔垃圾的地方,它的深度为 D D D( 2 ≤ D ≤ 100 2 \le D \le 100 2≤D≤100)英尺。 卡门想把垃圾堆起来,…...
[Docker实现测试部署CI/CD----构建成功后钉钉告警(7)]
目录 15、钉钉告警创建项目群,然后添加机器人添加机器人Jenkins 系统配置项目配置修改Jenkinsfile文件,添加钉钉提示信息测试 不修改Jenkinsfile文件,添加钉钉提示信息测试 15、钉钉告警 创建项目群,然后添加机器人 首先需要在钉…...
UE5 半透明覆层材质
文章目录 前言介绍示例1示例2示例3 前言 本文采用虚幻5.2.1版本演示,介绍半透明覆层材质(覆层材质)。 介绍 半透明覆层材质是 UE5.1 版本 更新的功能,使用半透明覆层材质,可以轻松的给物体表面附着一层材质。 在UE5…...
在Raspberry Pi 4上安装Ubuntu 20.04 + ROS noetic(不带显示器)
在Raspberry Pi 4上安装Ubuntu 20.04 ROS noetic(不带显示器) 1. 所需设备 所需设备: 树莓派 4 B 型 wifi microSD 卡:最小 32GB MicroSD 转 SD 适配器 (可选)显示器,鼠标等 2. 树莓派…...
CommStudio for .NET Crack
CommStudio for .NET Crack CommStudio for.NET使您的应用程序可以轻松地使用串行端口和调制解调器进行通信。CommStudio for.NET是一套全面的组件和可视化调试工具,可将远程系统和设备与visual Studio 2005和visual Studio 2008集成。开发与遗留系统和外部设备集成…...
蓝桥杯上岸考点清单 (冲刺版)!!!
大家好 我是寸铁💪 真题千千万万遍,蓝桥省一自然现! ✌️ 日更3000里,蓝桥眷顾你 🌟 暴力出奇迹,打表过样例 👊 冲刺蓝桥杯省一模板大全来啦 🔥 蓝桥杯4月8号就要开始了 &#…...
AI一键生成短视频
AI一键生成推文短视频 阅读时长:10分钟 本文内容: 结合开源AI,一键生成短视频发布到常见的某音,某手平台,狠狠赚一笔 前置知识: 1.基本的 python 编程知识 2.chatGPT 使用过 3.stable diffution 使用过 成果…...
基于MATLAB长时间序列遥感数据分析(以MODIS数据处理为例)
MATLAB MATLAB是美国MathWorks公司出品的商业数学软件,用于数据分析、无线通信、深度学习、图像处理与计算机视觉、信号处理、量化金融与风险管理、机器人,控制系统等领域。 [1] MATLAB是matrix&laboratory两个词的组合,意为矩阵工厂&a…...
postgresql之内存池-AllocsetContext
一、简介 postgresql大部分的内存分配管理都是通过MemoryContext进行操作的, 多个相关的MemoryContext构成了一个树型结构, 多个树构成了一个森林。 实现了三种MemoryContext: SlabContextGenerationContextAllocSetContext 使用全局变量CurrentMemo…...
QT 使用单例模式
目录 1. 单例模式介绍 2.单例模式实现 1. 单例模式介绍 有些时候我们在做 qt 项目的时候,要用到很多类. 例如我们用到的类有 A,B,C,D. 其中,A 是 B,C,D 中都需要用到的类,A 类非常的抢手. 但是,A 类非常的占内存,定义一个 A 对象需要 500M 内存,假如在 B,C,D 中都定义一个 A 类…...
接口测试——postman接口测试(三)
目录 1. postman介绍与安装 2. postman发送get请求 3. postman发送post请求 1. postman介绍与安装 安装网址:Postman安装教程:留言找我要即可 2. postman发送get请求 import pymysql from flask import Flask,request# 这里是mysql的基本连接信息 c…...
react中hooks的理解与使用
一、作用 我们知道react组件有两种写法一种是类组件,另一种是函数组件。而函数组件是无状态组件,如果我们要想改变组件中的状态就无法实现了。为此,在react16.8版本后官方推出hooks,用于函数组件更改状态。 二、常用API 1、use…...
STM32的电动自行车信息采集上报系统(学习)
摘要 针对电动自行车实时监管不便的问题,设计了一种基于STM32的电动自行车信息采集系统,通过获取电池、位置和行驶状态信息并上报到服务器中,实现实时监管。 通过多路串口请求电池、行驶状态和位置信息,以并发方式进行数据接收、…...
蓝桥杯上岸每日N题 第七期(小猫爬山)!!!
蓝桥杯上岸每日N题 第七期(小猫爬山)!!! 同步收录 👇 蓝桥杯上岸必背!!!(第四期DFS) 大家好 我是寸铁💪 冲刺蓝桥杯省一模板大全来啦 🔥 蓝桥杯4月8号就要开始了 &a…...
【Linux系统编程】冯诺依曼体系结构
目录 前言 什么是冯诺依曼体系结构? 冯诺依曼体系结构如何进行数据处理的? 存储器在冯诺依曼体系中有什么作用? 冯诺依曼体系结构为什么要这样设计? 冯诺依曼结构总结 前言 相信对于冯诺依曼这个人的名字大家一定不会感到陌…...
数据结构--动态顺序表
文章目录 线性表动态顺序表数组与顺序表 接口实现初始化:尾插:尾删头插头删指定位置插入指定位置删除查找摧毁 完整代码 线性表 线性表是数据结构中最基本、最简单也是最常用的一种数据结构。线性表是指由n个具有相同数据类型的元素组成的有限序列。 线…...
笔试数据结构选填题
目录 卡特兰数Catalan:出栈序列/二叉树数 树 二叉树 N01N2 哈夫曼树(最优二叉树)Huffman 度m的哈夫曼树只有度为0和m的结点:Nm(n-1)/(m-1) 平衡二叉树AVL Nh表示深度为h最少结点数,则N00,N11&#…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...
