当前位置: 首页 > news >正文

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录

  • 🍃前言
  • 🌴扫描线程的实现
  • 🌲实现消费消息
  • 🌳实现addConsumer()方法
  • 🎋VirtualHost类订阅消息的完善
  • ⭕总结

🍃前言

本次开发目标

  • 实现消费消息的核心逻辑

在这里插入图片描述

🌴扫描线程的实现

我们先给ConsumerManager类注入一些基础的属性

在这里插入图片描述
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。

传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
在这里插入图片描述
扫描线程逻辑如下:

  1. 阻塞队列里元素就取出,取出来的元素我们称它为令牌
  2. 根据该令牌我们可以找到需要被消费消息的队列
  3. 若该队列存在,我们便可以调用相应的消费方法进行消费

我们还需要将该线程设为后台线程

为了线程安全,必要的地方我们进行加锁操作。

并让这个线程永远的扫描下去,代码实现如下:

public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}

🌲实现消费消息

扫描后,就需要交给线程池进行负责执行回调函数消费消息

流程如下:

  1. 按照轮询的方式找出消费者
  2. 若消费者存在,则从队列中取出一个消息
  3. 若该队列中有消息,我们则交给线程池来执行回调函数消费消息

在线程池内我们需要做的操作有:

  1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
  2. 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
  3. 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.

我们先来看一下自动应答的情况:

删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息

需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.

实现代码如下:

private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}

关于手动应答的实现步骤我们分为以下四步:

  1. 获取到消息和队列
  2. 删除硬盘上的数据
  3. 删除消息中心中的数据
  4. 删除待确认的集合中的数据

代码实现如下:

public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}

🌳实现addConsumer()方法

该方法是为了实现前面我们添加订阅者的方法

在这个方法里,我们需要实现的是:

  1. 根据队列名查找该队列是否存在
  2. 若存在,构造相应的消费者进行添加
  3. 并且若该队列里存在未消费的消息,我们就直接进行消费掉

代码实现如下:

public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}

🎋VirtualHost类订阅消息的完善

直接调用即可,代码实现如下:

 // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

⭕总结

关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章:

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录 &#x1f343;前言&#x1f334;扫描线程的实现&#x1f332;实现消费消息&#x1f333;实现addConsumer()方法&#x1f38b;VirtualHost类订阅消息的完善⭕总结 &#x1f343;前言 本次开发目标 实现消费消息的核心逻辑 &#x1f334;扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注

目录 &#x1f41d;前言 &#x1f41d;canvas创建文字 &#x1f41d;将canvas作为纹理贴图加载到sprite中 &#x1f41d;封装方法 &#x1f41d;前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面&#xff0c;它通常和纹理贴图结合使用&#xff0c;贴图可以是一张图…...

C++中的类模板

C中的类模板 类模板 类模板在C中是一种非常强大的工具&#xff0c;它允许程序员编写与数据类型无关的代码。简单来说&#xff0c;类模板允许你定义一个蓝图&#xff0c;这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性&#xff0c;减少重复代码&#xff0…...

【每日一题】好子数组的最大分数

Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样&#xff0c;计算的都是最大矩形的面积。只不过多了一个约束&#xff1a;矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架

一、安装node.js 本来想粗略写一下的&#xff0c;但是搭建脚手架的时候&#xff0c;遇到了很多问题&#xff0c;浪费快两天时间&#xff0c;记录一下自己的解决办法希望对你们有帮助&#xff01; 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver

目录 前言 原理分析 step 0 step 1 EXP 前文&#xff1a;【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛&#x1f62d;&#xff0c;快做&#x1f92e;了 有了之前的经验&#xff0c;思路顺挺快的&#xff0c;中间不…...

视频技术1:使用ABLMediaServer推流rtsp

ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源&#xff0c;转换为rtsp流的过程。 作用&#xff1a;用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟

实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作

Chapter4&#xff1a;设计数据库之内部模式&#xff1a;SQL基本操作 笔记来源&#xff1a; 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤&#xff1a; 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录

一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部)&#xff1a;项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...

【好玩的经典游戏】Docker环境下部署RPG网页小游戏

【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...

前端逻辑错误或UI崩溃解决问题

全屏错误覆盖层或UI崩溃 VueReact&#xff08;错误边界&#xff09; Vue Vue的全屏错误覆盖层解决&#xff0c;其实只需要配置Error就好&#xff0c;在开发服务器的client.overlay中设置关闭全屏覆盖层 module.exports {devServer: {client: {overlay: {warnings: false,error…...

python爬取QQ音乐评论信息

python爬取QQ音乐评论信息 python爬取QQ音乐评论信息1.随便选个音乐python爬取QQ音乐评论信息 1.随便选个音乐 https://y.qq.com/n/yqq/song/0039MnYb0qxYhV.html 当前的后台调试页面显示如下: 找到评论的数据接口: https://c.y.qq.com/base/fcgi-bin/fcg_global_comme…...

Unity构建详解(1)——SBP介绍

【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程&#xff0c;构建的结果是生成可执行文件&#xff0c;在win平台上是exe&#xff0c;在Android平台上是apk&#xff…...

贪心算法(算法竞赛、蓝桥杯)--奶牛晒衣服

1、B站视频链接&#xff1a;A28 贪心算法 P1843 奶牛晒衣服_哔哩哔哩_bilibili 题目链接&#xff1a;奶牛晒衣服 - 洛谷 #include <bits/stdc.h> using namespace std; priority_queue<int> q;//用大根堆维护湿度的最大值 int n,a,b; int tim,maxn;int main(){s…...

Redis列表:高效消息通信与实时数据处理的利器

Redis是一个强大的开源内存数据库&#xff0c;被广泛应用于缓存、会话存储、队列等各种场景中。在Redis中&#xff0c;列表&#xff08;List&#xff09;是一种非常重要的数据结构&#xff0c;它提供了存储、获取、操作有序元素集合的功能。本文将深入探讨Redis列表的特性、使用…...

Redis中的缓存雪崩

缓存雪崩 &#x1f914;现象分析 缓存雪崩是指在同一时段大量的缓存key同时失效或者缓存服务(Redis等)宕机&#xff0c;导致大量请求到达数据库&#xff0c;带来巨大压力。 &#x1f44a; 解决方案 利用Redis集群提高服务的可用性&#xff0c;避免缓存服务宕机给缓存业务添…...

使用远程工具连接Mysql

&#xff08;若想要远程连接Mysql需要下面解决四个问题&#xff09; 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限&#xff08;因为mysql开始不允许其他IP访问&#xff0…...

2024不起眼的“致富”野路子,不想打工了,做做这些暴利创业项目。2024个人创业做什么项目好;最适合白手起家的创业项目

经济大环境差&#xff0c;并不代表就没有机会。相反&#xff0c;主流经济不好正是另一些人所看重的千载难逢的机会。就像股票市场一样&#xff0c;有人靠做多赚钱&#xff0c;有人靠做空赚钱。下面我们就来分析一下哪些行业会在这个时候崛起。 首先二手行业会迅速崛起&#xff…...

从后端获取文件数据并导出

导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

招商蛇口 | 执笔CID,启幕低密生活新境

作为中国城市生长的力量&#xff0c;招商蛇口以“美好生活承载者”为使命&#xff0c;深耕全球111座城市&#xff0c;以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子&#xff0c;招商蛇口始终与城市发展同频共振&#xff0c;以建筑诠释对土地与生活的…...