当前位置: 首页 > news >正文

【RabbitMQ 项目】服务端:数据管理模块之消息队列管理

文章目录

  • 一.编写思路
  • 二.代码实践

一.编写思路

  1. 定义消息队列
    1. 名字
    2. 是否持久化
  2. 定义队列持久化类(持久化到 sqlite3)
    1. 构造函数(只能成功,不能失败)
      1. 如果数据库(文件)不存在则创建
      2. 打开数据库
      3. 打开 msg_queue_table 数据库表
    2. 插入队列
    3. 移除队列
    4. 将数据库中的队列恢复到内存中
      传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表
  3. 定义队列管理类(包含内存管理和持久化管理)
    1. 构造函数:从数据库中恢复队列
    2. 声明队列
    3. 移除队列
    4. 获取队列

二.代码实践

MsgQueue.hpp:

#pragma once
#include "../common/Log.hpp"
#include "../common/Util.hpp"
#include "../common/Util.hpp"
#include <memory>
#include <unordered_map>
#include <mutex>
namespace ns_data
{class MsgQueue;using MsgQueuePtr = std::shared_ptr<MsgQueue>;/************* 定义消息队列* ****************/struct MsgQueue{std::string _name;bool _isDurable;MsgQueue(const std::string &name, bool isDurable): _name(name),_isDurable(isDurable){}};/****************** 定义消息队列持久化类* ******************/class MsgQueueMapper{private:ns_util::Sqlite3Util _sqlite;public:MsgQueueMapper(const std::string &dbName): _sqlite(dbName){// 确保数据库文件已经存在,不存在就创建if (!ns_util::FileUtil::createFile(dbName)){LOG(FATAL) << "create database " << dbName << " fail" << endl;exit(1);}if (!_sqlite.open()){LOG(FATAL) << "open database " << dbName << " fail" << endl;exit(1);}createTable();}/************** 插入消息队列* *************/bool insertMsgQueue(MsgQueuePtr msgQueuePtr){char insertSql[1024];sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);if (!_sqlite.exec(insertSql, nullptr, nullptr)){LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;return false;}return true;}/*********** 移除消息队列* ***************/void removeMsgQueue(const std::string &name){char deleteSql[1024];sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());if (!_sqlite.exec(deleteSql, nullptr, nullptr)){LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;}}/************ 从数据库中恢复消息队列到内存* *****************/void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr){const std::string selectSql = "select * from msg_queue_table;";if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr)){LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;exit(1);}}/*************** 删除数据库表(仅调试)* ***************/void removeTable(){const std::string dropSql = "drop table if exists msg_queue_table;";if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr)){LOG(WARNING) << "remove table msg_queue_table fail" << endl;}}private:void createTable(){const std::string createSql = "create table if not exists msg_queue_table(\name varchar(32) primary key,\durable int\);";if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr)){LOG(FATAL) << "create table msg_queue_table fail" << endl;exit(1);}}static int selectCallback(void *arg, int colNum, char **line, char **fields){auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);std::string name = line[0];bool isDurable = std::stoi(line[1]);auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);mapPtr->insert({name, msgQueuePtr});return 0;}};class MsgQueueManager{private:MsgQueueMapper _mapper;std::unordered_map<std::string, MsgQueuePtr> _msgQueues;std::mutex _mtx;public:MsgQueueManager(const std::string &dbName): _mapper(dbName){_mapper.recoverMsgQueue(&_msgQueues);}/************ 声明队列* ************/bool declareMsgQueue(const std::string &name, bool isDurable){std::unique_lock<std::mutex> lck(_mtx);if (_msgQueues.count(name)){return true;}auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);_msgQueues[name] = msgQueuePtr;if (isDurable){return _mapper.insertMsgQueue(msgQueuePtr);}return true;}/*********** 移除队列* ***********/void removeMsgQueue(const std::string &name){std::unique_lock<std::mutex> lck(_mtx);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){return;}if (it->second->_isDurable){_mapper.removeMsgQueue(name);}_msgQueues.erase(name);}/************* 获取指定队列* ***************/MsgQueuePtr getMsgQueue(const std::string &name){std::unique_lock<std::mutex> lck(_mtx);if (_msgQueues.count(name) == 0){return nullptr;}return _msgQueues[name];}/************** 清理所有队列(仅调试)* ******************/void clearMsgQueues(){std::unique_lock<std::mutex> lck(_mtx);_msgQueues.clear();_mapper.removeTable();}};}

相关文章:

【RabbitMQ 项目】服务端:数据管理模块之消息队列管理

文章目录 一.编写思路二.代码实践 一.编写思路 定义消息队列 名字是否持久化 定义队列持久化类(持久化到 sqlite3) 构造函数(只能成功&#xff0c;不能失败) 如果数据库(文件)不存在则创建打开数据库打开 msg_queue_table 数据库表 插入队列移除队列将数据库中的队列恢复到内存…...

SDKMAN!软件开发工具包管理器

认识一下SDKMAN!(The Software Development Kit Manager)是您在Unix系统上轻松管理多个软件开发工具包的可靠伴侣。想象一下&#xff0c;有不同版本的SDK&#xff0c;需要一种无感知的方式在它们之间切换。SDKMAN拥有易于使用的命令行界面&#xff08;CLI&#xff09;和API。其…...

《使用 LangChain 进行大模型应用开发》学习笔记(四)

前言 本文是 Harrison Chase &#xff08;LangChain 创建者&#xff09;和吴恩达&#xff08;Andrew Ng&#xff09;的视频课程《LangChain for LLM Application Development》&#xff08;使用 LangChain 进行大模型应用开发&#xff09;的学习笔记。由于原课程为全英文视频课…...

gbase8s数据库常见的索引扫描方式

1 顺序扫描&#xff08;Sequential scan&#xff09;&#xff1a;数据库服务器按照物理顺序读取表中的所有记录。 常发生在表上无索引或者数据量很少或者一些无法使用索引的sql语句中 2 索引扫描&#xff08;Index scan&#xff09;&#xff1a;数据库服务器读取索引页&#…...

边缘智能-大模型架构初探

R2Cloud接口 机器人注册 请求和应答 注册是一个简单的 HTTP 接口&#xff0c;根据机器人/用户信息注册&#xff0c;创建一个新机器人。 请求 URL URLhttp://ip/robot/regTypePOSTHTTP Version1.1Content-Typeapplication/json 请求参数 Param含义Rule是否必须缺省roboti…...

《python语言程序设计》2018版第8章18题几何circle2D类(上部)

一、利用第7章的内容来做前5个点 第一章之1--从各种角度来测量第一章之2--各种结果第二章之1--建立了针对比对点在圆内的几段第二章之2--利用建立的对比代码&#xff0c;得出的第2点位置 第一章之1–从各种角度来测量 class Circle2D:def __init__(self, x, y, radius):self._…...

nginx upstream转发连接错误情况研究

本次测试用到3台服务器&#xff1a; 192.168.10.115&#xff1a;转发服务器A 192.168.10.209&#xff1a;upstream下服务器1 192.168.10.210&#xff1a;upstream下服务器2 1台客户端&#xff1a;192.168.10.112 服务器A中nginx主要配置如下&#xff1a; log_format main…...

alias 后门从入门到应急响应

目录 1. alias 后门介绍 2. alias 后门注入方式 2.1 方式一(以函数的方式执行) 2.2 方式二(执行python脚本) 3.应急响应 3.1 查看所有连接 3.2 通过PID查看异常连接的进程&#xff0c;以及该进程正在执行的命令行命令 3.3 查看别名 3.4 其他情况 3.5 那么检查这些…...

【远程调用PythonAPI-flask】

文章目录 前言一、Pycharm创建flask项目1.创建虚拟环境2.创建flask项目 二、远程调用PythonAPI——SpringBoot项目集成1.修改PyCharm的host配置2.防火墙设置3.SpringBoot远程调用PythonAPI 前言 解决Pycharm运行Flask指定ip、端口更改无效的问题 首先先创建一个新的flask项目&…...

[今日Arxiv] 思维迭代:利用内心对话进行自主大型语言模型推理

思维迭代&#xff1a;利用内心对话进行自主大型语言模型推理 Iteration of Thought: Leveraging Inner Dialogue for Autonomous Large Language Model Reasoning URL&#xff1a;https://arxiv.org/abs/2409.12618 注&#xff1a;翻译可能存在误差&#xff0c;详细内容建议…...

glTF格式:WebGL应用的3D资产优化解决方案

摘要 glTF作为一种高效的3D资产格式&#xff0c;为WebGL、OpenGL ES和OpenGL运行时的应用提供了强有力的支持。它不仅简化了3D模型的传输与加载流程&#xff0c;还通过优化资产大小&#xff0c;使得打包、解包更加便捷。本文将深入探讨glTF格式的优势&#xff0c;并提供实用的代…...

Unity3D入门(一) : 第一个Unity3D项目,实现矩形自动旋转,并导出到Android运行

1. Unity3D介绍 Unity3D是虚拟现实行业中&#xff0c;使用率较高的一款软件。 它有着强大的功能&#xff0c;是让玩家轻松创建三维视频游戏、建筑可视化、实时三维动画等互动内容的多平台、综合型 虚拟现实开发工具。是一个全面整合的专业引擎。 2. Unity安装 官网 : Unity…...

数据结构与算法——Java实现 8.习题——移除链表元素(值)

祝福你有前路坦途的好运&#xff0c;更祝愿你能保持内心光亮 纵有风雨&#xff0c;依然选择勇敢前行 —— 24.9.22 203. 移除链表元素 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示…...

如何理解MVCC

MVCC是什么&#xff1f; MVCC&#xff0c;是MultiVersion Concurrency Control的缩写&#xff0c;翻译成中文就是多版本并发控制&#xff0c;多个事务同时访问同一数据时&#xff0c;调控每一个事务获取到数据的具体版本。和数据库锁一样&#xff0c;它也是一种并发控制的解决…...

在 Qt 中使用 QLabel 设置 GIF 动态背景

文章目录 在 Qt 中使用 QLabel 设置 GIF 动态背景本文食用注意目标实现步骤1. 准备工作2. 修改头文件 widget.h3. 实现构造函数和析构函数4. 调整背景大小5. 完整代码分析6. 运行程序 总结 在 Qt 中使用 QLabel 设置 GIF 动态背景 在 Qt 中&#xff0c;如果希望在窗口中设置一…...

Flyway 数据库差异处理

Flyway 数据库差异处理详解 在软件开发过程中&#xff0c;数据库 schema 的变更是不可避免的&#xff0c;尤其是在多人协作、多环境部署时&#xff0c;不同环境中的数据库结构可能出现差异。Flyway 作为一个数据库迁移工具&#xff0c;通过版本控制和自动化迁移&#xff0c;确…...

CSS 选择器的分类与使用要点一

目录 非 VIP 用户可前往公众号进行免费阅读 标签选择器 id 选择器 类选择器 介绍 公共类 CSS 中优先用 class 选择器,慎用 id 选择器 后代选择器 交集选择器 以标签名作为开头 以类名作为开头 连续交集 并集选择器(分组选择器) 通配符* 儿子选择器 >(IE7…...

无人机集群路径规划:麻雀搜索算法(Sparrow Search Algorithm, SSA)​求解无人机集群路径规划,提供MATLAB代码

一、单个无人机路径规划模型介绍 无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径&#xff0c;使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一&#xff0c;它可以通过算法和模型来确定无人机的航迹&#xff0c;以避开障碍物、优化…...

harbor集成trivy镜像扫描工具

harbor项目地址:GitHub - goharbor/harbor: An open source trusted cloud native registry project that stores, signs, and scans content. 前置条件:安装好docker和docker-compose 一、安装harbor 1、下载harbor安装包并解压 wget https://github.com/goharbor/harbo…...

DMA学习

一、DMA简介 DMA是一种无需CPU的参与就可以让外设与系统内存之间进行双向数据传输的硬件机制。使用DMA可以使系统CPU从实际的I/O数据传输过程中摆脱出来&#xff0c;从而大大提高系统的吞吐率。 DMA方式的数据传输由DMA控制器&#xff08;DMAC&#xff09;控制&#xff0c;在传…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

通过MicroSip配置自己的freeswitch服务器进行调试记录

之前用docker安装的freeswitch的&#xff0c;启动是正常的&#xff0c; 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...