【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录
- 🍃前言
- 🌴扫描线程的实现
- 🌲实现消费消息
- 🌳实现addConsumer()方法
- 🎋VirtualHost类订阅消息的完善
- ⭕总结
🍃前言
本次开发目标
- 实现消费消息的核心逻辑
🌴扫描线程的实现
我们先给ConsumerManager类注入一些基础的属性
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。
传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
扫描线程逻辑如下:
- 阻塞队列里元素就取出,取出来的元素我们称它为令牌
- 根据该令牌我们可以找到需要被消费消息的队列
- 若该队列存在,我们便可以调用相应的消费方法进行消费
我们还需要将该线程设为后台线程
为了线程安全,必要的地方我们进行加锁操作。
并让这个线程永远的扫描下去,代码实现如下:
public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}
🌲实现消费消息
扫描后,就需要交给线程池进行负责执行回调函数消费消息
流程如下:
- 按照轮询的方式找出消费者
- 若消费者存在,则从队列中取出一个消息
- 若该队列中有消息,我们则交给线程池来执行回调函数消费消息
在线程池内我们需要做的操作有:
- 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
- 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
- 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.
我们先来看一下自动应答的情况:
删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息
需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.
实现代码如下:
private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}
关于手动应答的实现步骤我们分为以下四步:
- 获取到消息和队列
- 删除硬盘上的数据
- 删除消息中心中的数据
- 删除待确认的集合中的数据
代码实现如下:
public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}
🌳实现addConsumer()方法
该方法是为了实现前面我们添加订阅者的方法
在这个方法里,我们需要实现的是:
- 根据队列名查找该队列是否存在
- 若存在,构造相应的消费者进行添加
- 并且若该队列里存在未消费的消息,我们就直接进行消费掉
代码实现如下:
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}
🎋VirtualHost类订阅消息的完善
直接调用即可,代码实现如下:
// 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}
⭕总结
关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下
相关文章:

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录 🍃前言🌴扫描线程的实现🌲实现消费消息🌳实现addConsumer()方法🎋VirtualHost类订阅消息的完善⭕总结 🍃前言 本次开发目标 实现消费消息的核心逻辑 🌴扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注
目录 🐝前言 🐝canvas创建文字 🐝将canvas作为纹理贴图加载到sprite中 🐝封装方法 🐝前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面,它通常和纹理贴图结合使用,贴图可以是一张图…...
C++中的类模板
C中的类模板 类模板 类模板在C中是一种非常强大的工具,它允许程序员编写与数据类型无关的代码。简单来说,类模板允许你定义一个蓝图,这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性,减少重复代码࿰…...

【每日一题】好子数组的最大分数
Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样,计算的都是最大矩形的面积。只不过多了一个约束:矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架
一、安装node.js 本来想粗略写一下的,但是搭建脚手架的时候,遇到了很多问题,浪费快两天时间,记录一下自己的解决办法希望对你们有帮助! 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver
目录 前言 原理分析 step 0 step 1 EXP 前文:【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛😭,快做🤮了 有了之前的经验,思路顺挺快的,中间不…...

视频技术1:使用ABLMediaServer推流rtsp
ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源,转换为rtsp流的过程。 作用:用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟
实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作
Chapter4:设计数据库之内部模式:SQL基本操作 笔记来源: 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤: 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录
一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部):项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...
【好玩的经典游戏】Docker环境下部署RPG网页小游戏
【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...
前端逻辑错误或UI崩溃解决问题
全屏错误覆盖层或UI崩溃 VueReact(错误边界) Vue Vue的全屏错误覆盖层解决,其实只需要配置Error就好,在开发服务器的client.overlay中设置关闭全屏覆盖层 module.exports {devServer: {client: {overlay: {warnings: false,error…...

python爬取QQ音乐评论信息
python爬取QQ音乐评论信息 python爬取QQ音乐评论信息1.随便选个音乐python爬取QQ音乐评论信息 1.随便选个音乐 https://y.qq.com/n/yqq/song/0039MnYb0qxYhV.html 当前的后台调试页面显示如下: 找到评论的数据接口: https://c.y.qq.com/base/fcgi-bin/fcg_global_comme…...
Unity构建详解(1)——SBP介绍
【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程,构建的结果是生成可执行文件,在win平台上是exe,在Android平台上是apkÿ…...

贪心算法(算法竞赛、蓝桥杯)--奶牛晒衣服
1、B站视频链接:A28 贪心算法 P1843 奶牛晒衣服_哔哩哔哩_bilibili 题目链接:奶牛晒衣服 - 洛谷 #include <bits/stdc.h> using namespace std; priority_queue<int> q;//用大根堆维护湿度的最大值 int n,a,b; int tim,maxn;int main(){s…...
Redis列表:高效消息通信与实时数据处理的利器
Redis是一个强大的开源内存数据库,被广泛应用于缓存、会话存储、队列等各种场景中。在Redis中,列表(List)是一种非常重要的数据结构,它提供了存储、获取、操作有序元素集合的功能。本文将深入探讨Redis列表的特性、使用…...

Redis中的缓存雪崩
缓存雪崩 🤔现象分析 缓存雪崩是指在同一时段大量的缓存key同时失效或者缓存服务(Redis等)宕机,导致大量请求到达数据库,带来巨大压力。 👊 解决方案 利用Redis集群提高服务的可用性,避免缓存服务宕机给缓存业务添…...

使用远程工具连接Mysql
(若想要远程连接Mysql需要下面解决四个问题) 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限(因为mysql开始不允许其他IP访问࿰…...

2024不起眼的“致富”野路子,不想打工了,做做这些暴利创业项目。2024个人创业做什么项目好;最适合白手起家的创业项目
经济大环境差,并不代表就没有机会。相反,主流经济不好正是另一些人所看重的千载难逢的机会。就像股票市场一样,有人靠做多赚钱,有人靠做空赚钱。下面我们就来分析一下哪些行业会在这个时候崛起。 首先二手行业会迅速崛起ÿ…...

从后端获取文件数据并导出
导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...