【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录
- 🍃前言
- 🌴扫描线程的实现
- 🌲实现消费消息
- 🌳实现addConsumer()方法
- 🎋VirtualHost类订阅消息的完善
- ⭕总结
🍃前言
本次开发目标
- 实现消费消息的核心逻辑

🌴扫描线程的实现
我们先给ConsumerManager类注入一些基础的属性

我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。
传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可

扫描线程逻辑如下:
- 阻塞队列里元素就取出,取出来的元素我们称它为令牌
- 根据该令牌我们可以找到需要被消费消息的队列
- 若该队列存在,我们便可以调用相应的消费方法进行消费
我们还需要将该线程设为后台线程
为了线程安全,必要的地方我们进行加锁操作。
并让这个线程永远的扫描下去,代码实现如下:
public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}
🌲实现消费消息
扫描后,就需要交给线程池进行负责执行回调函数消费消息
流程如下:
- 按照轮询的方式找出消费者
- 若消费者存在,则从队列中取出一个消息
- 若该队列中有消息,我们则交给线程池来执行回调函数消费消息
在线程池内我们需要做的操作有:
- 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
- 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
- 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.
我们先来看一下自动应答的情况:
删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息
需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.
实现代码如下:
private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}
关于手动应答的实现步骤我们分为以下四步:
- 获取到消息和队列
- 删除硬盘上的数据
- 删除消息中心中的数据
- 删除待确认的集合中的数据
代码实现如下:
public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}
🌳实现addConsumer()方法
该方法是为了实现前面我们添加订阅者的方法
在这个方法里,我们需要实现的是:
- 根据队列名查找该队列是否存在
- 若存在,构造相应的消费者进行添加
- 并且若该队列里存在未消费的消息,我们就直接进行消费掉
代码实现如下:
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}
🎋VirtualHost类订阅消息的完善
直接调用即可,代码实现如下:
// 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}
⭕总结
关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下
相关文章:
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录 🍃前言🌴扫描线程的实现🌲实现消费消息🌳实现addConsumer()方法🎋VirtualHost类订阅消息的完善⭕总结 🍃前言 本次开发目标 实现消费消息的核心逻辑 🌴扫描线程的实现 我们先给Cons…...
【Three.js】使用精灵图Sprite创建面朝相机的文本标注
目录 🐝前言 🐝canvas创建文字 🐝将canvas作为纹理贴图加载到sprite中 🐝封装方法 🐝前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面,它通常和纹理贴图结合使用,贴图可以是一张图…...
C++中的类模板
C中的类模板 类模板 类模板在C中是一种非常强大的工具,它允许程序员编写与数据类型无关的代码。简单来说,类模板允许你定义一个蓝图,这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性,减少重复代码࿰…...
【每日一题】好子数组的最大分数
Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样,计算的都是最大矩形的面积。只不过多了一个约束:矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...
Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架
一、安装node.js 本来想粗略写一下的,但是搭建脚手架的时候,遇到了很多问题,浪费快两天时间,记录一下自己的解决办法希望对你们有帮助! 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...
【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver
目录 前言 原理分析 step 0 step 1 EXP 前文:【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛😭,快做🤮了 有了之前的经验,思路顺挺快的,中间不…...
视频技术1:使用ABLMediaServer推流rtsp
ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源,转换为rtsp流的过程。 作用:用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...
HTML5+CSS3+JS小实例:创意罗盘时钟
实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...
设计数据库之内部模式:SQL基本操作
Chapter4:设计数据库之内部模式:SQL基本操作 笔记来源: 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤: 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...
Git浅谈配置文件和免密登录
一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部):项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...
【好玩的经典游戏】Docker环境下部署RPG网页小游戏
【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...
前端逻辑错误或UI崩溃解决问题
全屏错误覆盖层或UI崩溃 VueReact(错误边界) Vue Vue的全屏错误覆盖层解决,其实只需要配置Error就好,在开发服务器的client.overlay中设置关闭全屏覆盖层 module.exports {devServer: {client: {overlay: {warnings: false,error…...
python爬取QQ音乐评论信息
python爬取QQ音乐评论信息 python爬取QQ音乐评论信息1.随便选个音乐python爬取QQ音乐评论信息 1.随便选个音乐 https://y.qq.com/n/yqq/song/0039MnYb0qxYhV.html 当前的后台调试页面显示如下: 找到评论的数据接口: https://c.y.qq.com/base/fcgi-bin/fcg_global_comme…...
Unity构建详解(1)——SBP介绍
【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程,构建的结果是生成可执行文件,在win平台上是exe,在Android平台上是apkÿ…...
贪心算法(算法竞赛、蓝桥杯)--奶牛晒衣服
1、B站视频链接:A28 贪心算法 P1843 奶牛晒衣服_哔哩哔哩_bilibili 题目链接:奶牛晒衣服 - 洛谷 #include <bits/stdc.h> using namespace std; priority_queue<int> q;//用大根堆维护湿度的最大值 int n,a,b; int tim,maxn;int main(){s…...
Redis列表:高效消息通信与实时数据处理的利器
Redis是一个强大的开源内存数据库,被广泛应用于缓存、会话存储、队列等各种场景中。在Redis中,列表(List)是一种非常重要的数据结构,它提供了存储、获取、操作有序元素集合的功能。本文将深入探讨Redis列表的特性、使用…...
Redis中的缓存雪崩
缓存雪崩 🤔现象分析 缓存雪崩是指在同一时段大量的缓存key同时失效或者缓存服务(Redis等)宕机,导致大量请求到达数据库,带来巨大压力。 👊 解决方案 利用Redis集群提高服务的可用性,避免缓存服务宕机给缓存业务添…...
使用远程工具连接Mysql
(若想要远程连接Mysql需要下面解决四个问题) 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限(因为mysql开始不允许其他IP访问࿰…...
2024不起眼的“致富”野路子,不想打工了,做做这些暴利创业项目。2024个人创业做什么项目好;最适合白手起家的创业项目
经济大环境差,并不代表就没有机会。相反,主流经济不好正是另一些人所看重的千载难逢的机会。就像股票市场一样,有人靠做多赚钱,有人靠做空赚钱。下面我们就来分析一下哪些行业会在这个时候崛起。 首先二手行业会迅速崛起ÿ…...
从后端获取文件数据并导出
导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…...
golang如何实现备忘录模式_golang备忘录模式实现方案
Go中备忘录模式需用非导出结构体封装快照、接口作类型标记,发起者控制Save/Restore;只备份业务字段,避免指针/map共享;限制栈长度并置空引用助GC;测试用reflect.DeepEqual验证隔离性。备忘录模式在 Go 里没有语言原生支…...
Matplotlib数据可视化实战:从基础图表到高级定制
1. Matplotlib入门:为什么选择这个可视化工具? 第一次接触数据可视化时,我被各种工具搞得眼花缭乱。Excel、Tableau、Power BI...直到遇见Matplotlib,才发现这个Python库才是数据分析师的"瑞士军刀"。它最大的优势就是…...
MARY TTS信号处理核心技术:正弦分析与HNM算法的深度剖析
MARY TTS信号处理核心技术:正弦分析与HNM算法的深度剖析 【免费下载链接】marytts MARY TTS -- an open-source, multilingual text-to-speech synthesis system written in pure java 项目地址: https://gitcode.com/gh_mirrors/ma/marytts MARY TTS作为一款…...
Z-Image-Turbo-rinaiqiao-huiyewunv实操手册:生成图批量命名规则与文件夹自动归类脚本
Z-Image-Turbo-rinaiqiao-huiyewunv实操手册:生成图批量命名规则与文件夹自动归类脚本 1. 引言:从一张图到一百张图的烦恼 当你用Z-Image Turbo(辉夜大小姐-日奈娇)工具生成第一张精美的二次元人物图时,那种兴奋感是…...
MDCSwipeToChoose快速入门:5步创建你的第一个滑动卡片应用
MDCSwipeToChoose快速入门:5步创建你的第一个滑动卡片应用 【免费下载链接】MDCSwipeToChoose Swipe to "like" or "dislike" any view, just like Tinder.app. Build a flashcard app, a photo viewer, and more, in minutes, not hours! 项…...
天融信防火墙双机热备-备防火墙替换 NGFW4000G-UF(TG-56008-YL)
1.拿到空配置备机,PC连接防火墙设备eth0口(接口默认地址192.168.1.254/24),PC网口配置和设备同网段地址如192.168.1.253/24 2.PC去ping192.168.1.254地址是否能通,通则下一步。 3.打开浏览器输入https://192.168.1.25…...
Qwen2.5-7B-Instruct完整指南:从部署到应用,一站式解决方案
Qwen2.5-7B-Instruct完整指南:从部署到应用,一站式解决方案 1. 引言:为什么你需要关注Qwen2.5-7B-Instruct? 如果你正在寻找一个既强大又能在本地安全运行的AI对话助手,那么Qwen2.5-7B-Instruct绝对值得你花时间了解…...
局域网聊天工具选型:为什么企业办公场景更青睐 BeeWorks? - BeeWorks
在制造、政务、军工、大型集团等行业中,内网隔离、无外网办公已成为常态,一款专业的局域网聊天工具成为刚性需求。不同于依赖公有云服务器的通用即时通讯软件,局域网聊天工具将数据传输与存储完全限定在企业内部网络,从物理层面杜…...
Python实现GCJ-02与CGCS2000坐标转换的GUI工具开发
1. 为什么需要坐标转换工具 第一次接触地图开发的朋友可能会疑惑:为什么坐标还需要转换?这得从国内地图服务的特殊性说起。国内主流地图服务如高德、腾讯地图使用的GCJ-02坐标系(俗称火星坐标系),与全球通用的WGS84坐标…...
从零开始掌握YOLO——实时目标检测的技术详解
你正在打开手机相册,系统自动把所有照片按“人物”“风景”“宠物”整理好;你开车经过十字路口,路边的摄像头精准识别出车牌和车型;工厂流水线上,机械臂的“眼睛”实时锁定每一个瑕疵品——这些场景背后,几乎都站着一个名字:YOLO。 YOLO(You Only Look Once)自2015年…...
