当前位置: 首页 > news >正文

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录

  • 🍃前言
  • 🌴扫描线程的实现
  • 🌲实现消费消息
  • 🌳实现addConsumer()方法
  • 🎋VirtualHost类订阅消息的完善
  • ⭕总结

🍃前言

本次开发目标

  • 实现消费消息的核心逻辑

在这里插入图片描述

🌴扫描线程的实现

我们先给ConsumerManager类注入一些基础的属性

在这里插入图片描述
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。

传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
在这里插入图片描述
扫描线程逻辑如下:

  1. 阻塞队列里元素就取出,取出来的元素我们称它为令牌
  2. 根据该令牌我们可以找到需要被消费消息的队列
  3. 若该队列存在,我们便可以调用相应的消费方法进行消费

我们还需要将该线程设为后台线程

为了线程安全,必要的地方我们进行加锁操作。

并让这个线程永远的扫描下去,代码实现如下:

public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();
}

🌲实现消费消息

扫描后,就需要交给线程池进行负责执行回调函数消费消息

流程如下:

  1. 按照轮询的方式找出消费者
  2. 若消费者存在,则从队列中取出一个消息
  3. 若该队列中有消息,我们则交给线程池来执行回调函数消费消息

在线程池内我们需要做的操作有:

  1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
  2. 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
  3. 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.

我们先来看一下自动应答的情况:

删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息

需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.

实现代码如下:

private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});
}

关于手动应答的实现步骤我们分为以下四步:

  1. 获取到消息和队列
  2. 删除硬盘上的数据
  3. 删除消息中心中的数据
  4. 删除待确认的集合中的数据

代码实现如下:

public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + queueName;try {// 1. 获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName+ ", messageId=" + messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName+ ", messageId=" + messageId);e.printStackTrace();return false;}
}

🌳实现addConsumer()方法

该方法是为了实现前面我们添加订阅者的方法

在这个方法里,我们需要实现的是:

  1. 根据队列名查找该队列是否存在
  2. 若存在,构造相应的消费者进行添加
  3. 并且若该队列里存在未消费的消息,我们就直接进行消费掉

代码实现如下:

public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}
}

🎋VirtualHost类订阅消息的完善

直接调用即可,代码实现如下:

 // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

⭕总结

关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章:

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录 &#x1f343;前言&#x1f334;扫描线程的实现&#x1f332;实现消费消息&#x1f333;实现addConsumer()方法&#x1f38b;VirtualHost类订阅消息的完善⭕总结 &#x1f343;前言 本次开发目标 实现消费消息的核心逻辑 &#x1f334;扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注

目录 &#x1f41d;前言 &#x1f41d;canvas创建文字 &#x1f41d;将canvas作为纹理贴图加载到sprite中 &#x1f41d;封装方法 &#x1f41d;前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面&#xff0c;它通常和纹理贴图结合使用&#xff0c;贴图可以是一张图…...

C++中的类模板

C中的类模板 类模板 类模板在C中是一种非常强大的工具&#xff0c;它允许程序员编写与数据类型无关的代码。简单来说&#xff0c;类模板允许你定义一个蓝图&#xff0c;这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性&#xff0c;减少重复代码&#xff0…...

【每日一题】好子数组的最大分数

Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样&#xff0c;计算的都是最大矩形的面积。只不过多了一个约束&#xff1a;矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架

一、安装node.js 本来想粗略写一下的&#xff0c;但是搭建脚手架的时候&#xff0c;遇到了很多问题&#xff0c;浪费快两天时间&#xff0c;记录一下自己的解决办法希望对你们有帮助&#xff01; 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver

目录 前言 原理分析 step 0 step 1 EXP 前文&#xff1a;【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛&#x1f62d;&#xff0c;快做&#x1f92e;了 有了之前的经验&#xff0c;思路顺挺快的&#xff0c;中间不…...

视频技术1:使用ABLMediaServer推流rtsp

ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源&#xff0c;转换为rtsp流的过程。 作用&#xff1a;用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟

实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作

Chapter4&#xff1a;设计数据库之内部模式&#xff1a;SQL基本操作 笔记来源&#xff1a; 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤&#xff1a; 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录

一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部)&#xff1a;项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...

【好玩的经典游戏】Docker环境下部署RPG网页小游戏

【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...

前端逻辑错误或UI崩溃解决问题

全屏错误覆盖层或UI崩溃 VueReact&#xff08;错误边界&#xff09; Vue Vue的全屏错误覆盖层解决&#xff0c;其实只需要配置Error就好&#xff0c;在开发服务器的client.overlay中设置关闭全屏覆盖层 module.exports {devServer: {client: {overlay: {warnings: false,error…...

python爬取QQ音乐评论信息

python爬取QQ音乐评论信息 python爬取QQ音乐评论信息1.随便选个音乐python爬取QQ音乐评论信息 1.随便选个音乐 https://y.qq.com/n/yqq/song/0039MnYb0qxYhV.html 当前的后台调试页面显示如下: 找到评论的数据接口: https://c.y.qq.com/base/fcgi-bin/fcg_global_comme…...

Unity构建详解(1)——SBP介绍

【前言】 Unity的资源工作流程分为导入、创建、构建、分发、加载。我们说的是其中的构建步骤。 构建是指将项目工程中的资源文件和代码整合程可执行文件的过程&#xff0c;构建的结果是生成可执行文件&#xff0c;在win平台上是exe&#xff0c;在Android平台上是apk&#xff…...

贪心算法(算法竞赛、蓝桥杯)--奶牛晒衣服

1、B站视频链接&#xff1a;A28 贪心算法 P1843 奶牛晒衣服_哔哩哔哩_bilibili 题目链接&#xff1a;奶牛晒衣服 - 洛谷 #include <bits/stdc.h> using namespace std; priority_queue<int> q;//用大根堆维护湿度的最大值 int n,a,b; int tim,maxn;int main(){s…...

Redis列表:高效消息通信与实时数据处理的利器

Redis是一个强大的开源内存数据库&#xff0c;被广泛应用于缓存、会话存储、队列等各种场景中。在Redis中&#xff0c;列表&#xff08;List&#xff09;是一种非常重要的数据结构&#xff0c;它提供了存储、获取、操作有序元素集合的功能。本文将深入探讨Redis列表的特性、使用…...

Redis中的缓存雪崩

缓存雪崩 &#x1f914;现象分析 缓存雪崩是指在同一时段大量的缓存key同时失效或者缓存服务(Redis等)宕机&#xff0c;导致大量请求到达数据库&#xff0c;带来巨大压力。 &#x1f44a; 解决方案 利用Redis集群提高服务的可用性&#xff0c;避免缓存服务宕机给缓存业务添…...

使用远程工具连接Mysql

&#xff08;若想要远程连接Mysql需要下面解决四个问题&#xff09; 1、目标地址 直接查询 2、端口号 3306 3、防火墙关闭 [rootlocalhost date]# systemctl stop firewalld.service 4、授权mysql数据库root用户权限&#xff08;因为mysql开始不允许其他IP访问&#xff0…...

2024不起眼的“致富”野路子,不想打工了,做做这些暴利创业项目。2024个人创业做什么项目好;最适合白手起家的创业项目

经济大环境差&#xff0c;并不代表就没有机会。相反&#xff0c;主流经济不好正是另一些人所看重的千载难逢的机会。就像股票市场一样&#xff0c;有人靠做多赚钱&#xff0c;有人靠做空赚钱。下面我们就来分析一下哪些行业会在这个时候崛起。 首先二手行业会迅速崛起&#xff…...

从后端获取文件数据并导出

导出文件的公共方法 export const download (res, tools) > {const { message, hide } tools;const fileReader: any new FileReader();console.log(fileReader-res>>>, res);fileReader.onload (e) > {if (res?.data?.type application/json) {try {co…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...