【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录
- 🍃前言
- 🌴扫描线程的实现
- 🌲实现消费消息
- 🌳实现addConsumer()方法
- 🎋VirtualHost类订阅消息的完善
- ⭕总结
🍃前言
本次开发目标
- 实现消费消息的核心逻辑
🌴扫描线程的实现
我们先给ConsumerManager类注入一些基础的属性
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。
传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
扫描线程逻辑如下:
- 阻塞队列里元素就取出,取出来的元素我们称它为令牌
- 根据该令牌我们可以找到需要被消费消息的队列
- 若该队列存在,我们便可以调用相应的消费方法进行消费
我们还需要将该线程设为后台线程
为了线程安全,必要的地方我们进行加锁操作。
并让这个线程永远的扫描下去,代码实现如下:
public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}
🌲实现消费消息
扫描后,就需要交给线程池进行负责执行回调函数消费消息
流程如下:
- 按照轮询的方式找出消费者
- 若消费者存在,则从队列中取出一个消息
- 若该队列中有消息,我们则交给线程池来执行回调函数消费消息
在线程池内我们需要做的操作有:
- 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
- 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
- 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.
我们先来看一下自动应答的情况:
删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息
需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.
实现代码如下:
private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}
关于手动应答的实现步骤我们分为以下四步:
- 获取到消息和队列
- 删除硬盘上的数据
- 删除消息中心中的数据
- 删除待确认的集合中的数据
代码实现如下:
public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}
🌳实现addConsumer()方法
该方法是为了实现前面我们添加订阅者的方法
在这个方法里,我们需要实现的是:
- 根据队列名查找该队列是否存在
- 若存在,构造相应的消费者进行添加
- 并且若该队列里存在未消费的消息,我们就直接进行消费掉
代码实现如下:
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}
🎋VirtualHost类订阅消息的完善
直接调用即可,代码实现如下:
// 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}
⭕总结
关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下
相关文章:

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
文章目录 🍃前言🌴扫描线程的实现🌲实现消费消息🌳实现addConsumer()方法🎋VirtualHost类订阅消息的完善⭕总结 🍃前言 本次开发目标 实现消费消息的核心逻辑 🌴扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注
目录 🐝前言 🐝canvas创建文字 🐝将canvas作为纹理贴图加载到sprite中 🐝封装方法 🐝前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面,它通常和纹理贴图结合使用,贴图可以是一张图…...
C++中的类模板
C中的类模板 类模板 类模板在C中是一种非常强大的工具,它允许程序员编写与数据类型无关的代码。简单来说,类模板允许你定义一个蓝图,这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性,减少重复代码࿰…...

【每日一题】好子数组的最大分数
Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样,计算的都是最大矩形的面积。只不过多了一个约束:矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架
一、安装node.js 本来想粗略写一下的,但是搭建脚手架的时候,遇到了很多问题,浪费快两天时间,记录一下自己的解决办法希望对你们有帮助! 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver
目录 前言 原理分析 step 0 step 1 EXP 前文:【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛😭,快做🤮了 有了之前的经验,思路顺挺快的,中间不…...

视频技术1:使用ABLMediaServer推流rtsp
ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源,转换为rtsp流的过程。 作用:用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟
实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作
Chapter4:设计数据库之内部模式:SQL基本操作 笔记来源: 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤: 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录
一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部):项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...
【好玩的经典游戏】Docker环境下部署RPG网页小游戏
【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...
前端逻辑错误或UI崩溃解决问题
全屏错误覆盖层或UI崩溃 VueReact(错误边界) Vue Vue的全屏错误覆盖层解决,其实只需要配置Error就好,在开发服务器的client.overlay中设置关闭全屏覆盖层 module.exports {devServer: {client: {overlay: {warnings: false,error…...

python爬取QQ音乐评论信息
python爬取QQ音乐评论信息 python爬取QQ音乐评论信息1.随便选个音乐python爬取QQ音乐评论信息 1.随便选个音乐 https://y.qq.com/n/yqq/song/0039MnYb0qxYhV.html 当前的后台调试页面显示如下: 找到评论的数据接口: https://c.y.qq.com/base/fcgi-bin/fcg_global_comme…...
Unity构建详解(1)——SBP介绍
【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程,构建的结果是生成可执行文件,在win平台上是exe,在Android平台上是apkÿ…...

贪心算法(算法竞赛、蓝桥杯)--奶牛晒衣服
1、B站视频链接:A28 贪心算法 P1843 奶牛晒衣服_哔哩哔哩_bilibili 题目链接:奶牛晒衣服 - 洛谷 #include <bits/stdc.h> using namespace std; priority_queue<int> q;//用大根堆维护湿度的最大值 int n,a,b; int tim,maxn;int main(){s…...
Redis列表:高效消息通信与实时数据处理的利器
Redis是一个强大的开源内存数据库,被广泛应用于缓存、会话存储、队列等各种场景中。在Redis中,列表(List)是一种非常重要的数据结构,它提供了存储、获取、操作有序元素集合的功能。本文将深入探讨Redis列表的特性、使用…...

Redis中的缓存雪崩
缓存雪崩 🤔现象分析 缓存雪崩是指在同一时段大量的缓存key同时失效或者缓存服务(Redis等)宕机,导致大量请求到达数据库,带来巨大压力。 👊 解决方案 利用Redis集群提高服务的可用性,避免缓存服务宕机给缓存业务添…...

使用远程工具连接Mysql
(若想要远程连接Mysql需要下面解决四个问题) 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限(因为mysql开始不允许其他IP访问࿰…...

2024不起眼的“致富”野路子,不想打工了,做做这些暴利创业项目。2024个人创业做什么项目好;最适合白手起家的创业项目
经济大环境差,并不代表就没有机会。相反,主流经济不好正是另一些人所看重的千载难逢的机会。就像股票市场一样,有人靠做多赚钱,有人靠做空赚钱。下面我们就来分析一下哪些行业会在这个时候崛起。 首先二手行业会迅速崛起ÿ…...

从后端获取文件数据并导出
导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...