RabbitMQ - 简单案例
目录
0.引用
1.Hello world
2.轮训分发消息
2.1 抽取工具类
2.2 启动两个工作线程接受消息
2.4 结果展示
3.消息应答
3.1 自动应答
3.2 手动消息应答的方法
3.3 消息自动重新入队
3.4 消息手动应答代码
4.RabbitMQ 持久化
4.1 队列如何实现持久化
4.2 消息实现持久化
5.不公平分发
6.预取值分发
0.引用
https://note.oddfar.com/rabbitmq/
1.Hello world
1.1 依赖引用
<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
</dependencies>
1.2 消息生产者
package com.example.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生成一个队列* 1.QUEUE_NAME 队列名称* 2.durable 队列里面的消息是否持久化 也就是是否用完就删除* 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/Boolean durable = true;Boolean exclusive = false;Boolean autoDelete = false;Map<String, Object> arguments = null;channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);String message = "hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}}
1.3 消息消费者
package com.example.one;
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息.........");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/*** 消费者消费消息 - 接受消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调* 4.消息被取消时的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}
2.轮训分发消息
2.1 抽取工具类
package com.example.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
2.2 启动两个工作线程接受消息
package com.example.two;import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}
选中 Allow multiple instances

启动后

2.3 启动一个发送消息线程
public class Task01 {public static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完成:" + message);}}
}
2.4 结果展示
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
3.消息应答
3.1 自动应答
消息发送后立即被认为已经传送成功
3.2 手动消息应答的方法
- Channel.basicAck(用于肯定确认)
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认)
Multiple 的解释:
手动应答的好处是可以批量应答并且减少网络拥堵

- true 代表批量应答 channel 上未应答的消息
- false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

3.3 消息自动重新入队
3.4 消息手动应答代码
消费者在上面代码的基础上增加了以下内容
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4.RabbitMQ 持久化
4.1 队列如何实现持久化
//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
4.2 消息实现持久化
需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

5.不公平分发
为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1);
//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
6.预取值分发
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的
相关文章:
RabbitMQ - 简单案例
目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…...
《吐血整理》高级系列教程-吃透Fiddler抓包教程(30)-Fiddler如何抓Android7.0以上的Https包-番外篇
1.简介 通过宏哥前边几篇文章的讲解和介绍想必大家都知道android7.0以上,有android的机制不在信任用户证书,导致https协议无法抓包。除非把证书装在系统信任的证书里,此时手机需要root权限。但是大家都知道root手机是非常繁琐的且不安全&…...
服务器被攻击了怎么办?
服务器被攻击是无法避免的,但是我们能通过做好防护措施,提高服务器的安全性,降低被攻击的几率。那么当服务器已经被 攻击了,怎样才能降低损失呢?该怎样补救? 断开网络 全部的攻击都来自于网络,因…...
P1156 垃圾陷阱(背包变形)
垃圾陷阱 题目描述 卡门――农夫约翰极其珍视的一条 Holsteins 奶牛――已经落了到 “垃圾井” 中。“垃圾井” 是农夫们扔垃圾的地方,它的深度为 D D D( 2 ≤ D ≤ 100 2 \le D \le 100 2≤D≤100)英尺。 卡门想把垃圾堆起来,…...
[Docker实现测试部署CI/CD----构建成功后钉钉告警(7)]
目录 15、钉钉告警创建项目群,然后添加机器人添加机器人Jenkins 系统配置项目配置修改Jenkinsfile文件,添加钉钉提示信息测试 不修改Jenkinsfile文件,添加钉钉提示信息测试 15、钉钉告警 创建项目群,然后添加机器人 首先需要在钉…...
UE5 半透明覆层材质
文章目录 前言介绍示例1示例2示例3 前言 本文采用虚幻5.2.1版本演示,介绍半透明覆层材质(覆层材质)。 介绍 半透明覆层材质是 UE5.1 版本 更新的功能,使用半透明覆层材质,可以轻松的给物体表面附着一层材质。 在UE5…...
在Raspberry Pi 4上安装Ubuntu 20.04 + ROS noetic(不带显示器)
在Raspberry Pi 4上安装Ubuntu 20.04 ROS noetic(不带显示器) 1. 所需设备 所需设备: 树莓派 4 B 型 wifi microSD 卡:最小 32GB MicroSD 转 SD 适配器 (可选)显示器,鼠标等 2. 树莓派…...
CommStudio for .NET Crack
CommStudio for .NET Crack CommStudio for.NET使您的应用程序可以轻松地使用串行端口和调制解调器进行通信。CommStudio for.NET是一套全面的组件和可视化调试工具,可将远程系统和设备与visual Studio 2005和visual Studio 2008集成。开发与遗留系统和外部设备集成…...
蓝桥杯上岸考点清单 (冲刺版)!!!
大家好 我是寸铁💪 真题千千万万遍,蓝桥省一自然现! ✌️ 日更3000里,蓝桥眷顾你 🌟 暴力出奇迹,打表过样例 👊 冲刺蓝桥杯省一模板大全来啦 🔥 蓝桥杯4月8号就要开始了 &#…...
AI一键生成短视频
AI一键生成推文短视频 阅读时长:10分钟 本文内容: 结合开源AI,一键生成短视频发布到常见的某音,某手平台,狠狠赚一笔 前置知识: 1.基本的 python 编程知识 2.chatGPT 使用过 3.stable diffution 使用过 成果…...
基于MATLAB长时间序列遥感数据分析(以MODIS数据处理为例)
MATLAB MATLAB是美国MathWorks公司出品的商业数学软件,用于数据分析、无线通信、深度学习、图像处理与计算机视觉、信号处理、量化金融与风险管理、机器人,控制系统等领域。 [1] MATLAB是matrix&laboratory两个词的组合,意为矩阵工厂&a…...
postgresql之内存池-AllocsetContext
一、简介 postgresql大部分的内存分配管理都是通过MemoryContext进行操作的, 多个相关的MemoryContext构成了一个树型结构, 多个树构成了一个森林。 实现了三种MemoryContext: SlabContextGenerationContextAllocSetContext 使用全局变量CurrentMemo…...
QT 使用单例模式
目录 1. 单例模式介绍 2.单例模式实现 1. 单例模式介绍 有些时候我们在做 qt 项目的时候,要用到很多类. 例如我们用到的类有 A,B,C,D. 其中,A 是 B,C,D 中都需要用到的类,A 类非常的抢手. 但是,A 类非常的占内存,定义一个 A 对象需要 500M 内存,假如在 B,C,D 中都定义一个 A 类…...
接口测试——postman接口测试(三)
目录 1. postman介绍与安装 2. postman发送get请求 3. postman发送post请求 1. postman介绍与安装 安装网址:Postman安装教程:留言找我要即可 2. postman发送get请求 import pymysql from flask import Flask,request# 这里是mysql的基本连接信息 c…...
react中hooks的理解与使用
一、作用 我们知道react组件有两种写法一种是类组件,另一种是函数组件。而函数组件是无状态组件,如果我们要想改变组件中的状态就无法实现了。为此,在react16.8版本后官方推出hooks,用于函数组件更改状态。 二、常用API 1、use…...
STM32的电动自行车信息采集上报系统(学习)
摘要 针对电动自行车实时监管不便的问题,设计了一种基于STM32的电动自行车信息采集系统,通过获取电池、位置和行驶状态信息并上报到服务器中,实现实时监管。 通过多路串口请求电池、行驶状态和位置信息,以并发方式进行数据接收、…...
蓝桥杯上岸每日N题 第七期(小猫爬山)!!!
蓝桥杯上岸每日N题 第七期(小猫爬山)!!! 同步收录 👇 蓝桥杯上岸必背!!!(第四期DFS) 大家好 我是寸铁💪 冲刺蓝桥杯省一模板大全来啦 🔥 蓝桥杯4月8号就要开始了 &a…...
【Linux系统编程】冯诺依曼体系结构
目录 前言 什么是冯诺依曼体系结构? 冯诺依曼体系结构如何进行数据处理的? 存储器在冯诺依曼体系中有什么作用? 冯诺依曼体系结构为什么要这样设计? 冯诺依曼结构总结 前言 相信对于冯诺依曼这个人的名字大家一定不会感到陌…...
数据结构--动态顺序表
文章目录 线性表动态顺序表数组与顺序表 接口实现初始化:尾插:尾删头插头删指定位置插入指定位置删除查找摧毁 完整代码 线性表 线性表是数据结构中最基本、最简单也是最常用的一种数据结构。线性表是指由n个具有相同数据类型的元素组成的有限序列。 线…...
笔试数据结构选填题
目录 卡特兰数Catalan:出栈序列/二叉树数 树 二叉树 N01N2 哈夫曼树(最优二叉树)Huffman 度m的哈夫曼树只有度为0和m的结点:Nm(n-1)/(m-1) 平衡二叉树AVL Nh表示深度为h最少结点数,则N00,N11&#…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)
船舶制造装配管理现状:装配工作依赖人工经验,装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书,但在实际执行中,工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
