当前位置: 首页 > article >正文

进程间通信(消息队列)

目录

一 原理

二 API

1. ftok

2. msgget

3. msgctl

4. msgsnd

5. msgrcv

三 demo代码

四 基于责任链模式和消息队列对数据处理

1. 什么是责任链模式

2. 下面基于责任链模式来对消息队列获取的消息进行处理


前置

其实system v 版本的进程间通信,设计的接口都类似,所以会用一个也就会用其他的了,比如前面的共享内存的shmget获取shmid,本章讲的消息队列提供的msgget获取msgid是一样的,接口都大同小异。

不过消息队列不同于共享内存的是,发送的数据是有类型的,不像共享内存没有类型的数据发送。

下面的消息结构

struct msgbuf {long mtype;     // 谁发送的char mtext[N];  // 发送的消息};

一 原理

1. 和共享内存一样通过系统调用并让物理内存和共享区进行映射。

2. 但不一样的是共享内存通信的时候不需要系统调用,但消息队列需要,所以消息队列读写操作,默认没数据读会阻塞,缓冲区满了写会阻塞,自带同步与互斥。

3. 先通过系统调用建立共享区与物理内存的映射,后续在通过系统调用进行发送和接收消息,发送消息的时候要携带消息id,代表是谁发的,后续收消息要根据这个id区分拿谁发的消息,毕竟总不能自己发自己收吧。

二 API

1. ftok
#include <sys/types.h>
#include <sys/ipc.h>key_t ftok(const char *pathname, int proj_id);

和共享内存一样这里就不详细介绍了,形成唯一的 key_t 类型标识唯一性。

2. msgget
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key,   // ftok的返回值int msgflg   // 和共享内存一样 IPC_CREAT/IPC_EXCL// IPC_CREAT            存在则返回已经存在的,否则创建新的// IPC_CREAT | IPC_EXCL 不纯在则创建新的,纯在则出错返回);// 返回值为0正常,-1错误
3. msgctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgctl(int msqid,             // msgget的返回值int cmd,               // 对已经存在的消息队列 CURD 操作struct msqid_ds *buf   // 消息队列的属性字段);// 返回值为0正常,失败-1
4. msgsnd
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgsnd(int msqid,          // msgget的返回值const void *msgp,   // 消息的数据类型结构size_t msgsz,       // 大小(不携带该结构第一个字段 (long mtype))int msgflg          // 怎么发,阻塞/非阻塞等);// 返回值0正常,-1错误
5. msgrcv
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>ssize_t msgrcv(int msqid,     // msgget的返回值 void *msgp,    // 消息结构类型size_t msgsz,  // 大小(不携带类型第一个字段)long msgtyp,   // 该类型第一个字段,表示收谁的消息int msgflg     // 怎么收,阻塞/非阻塞等);// 返回值>0实际读到的数据个数,为-1读出错

三 demo代码

#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>#include <cstring>// 初始化数据
const int mydefault = -1;// 标识谁发的消息
const int Ser = 1;
const int Cli = 2;// 协同 ftok 形成的 key_t 类型
const static std::string path = "/home/CD/linux/message_queue/Demo_mes";
const static int proj_id = 1234;// 缓冲区大小
const size_t mes_buff = 4096;// 消息类型
struct mymessage
{long mtype;char mtext[mes_buff];
};// 转16进制
void To_Hex(int val)
{char buff[1024] = {0};snprintf(buff, sizeof(buff), "0x%x", val);std::cout << buff << std::endl;return;
}// 获取 key_t
key_t Getkey(const std::string &mypath, int myproj)
{key_t key = ftok(mypath.c_str(), myproj);return key;
}// 公共方法
class Message_Queue
{
public:Message_Queue(const std::string &mypath, int myproj) : _msqid(mydefault), _path(mypath), _proj_id(myproj), _key(mydefault){_key = Getkey(_path, _proj_id);if (_key == -1){std::cout << "Get key failed" << std::endl;exit(-1);}To_Hex(_key);}// 创建者创建消息队列void Create(){// 创建并设置权限为 读写_msqid = msgget(_key, IPC_CREAT | IPC_EXCL | 0666);if (_msqid == -1){std::cout << "Create _msqid failed" << std::endl;exit(-2);}}// 获取已经存在的消息独立额void User(){_msqid = msgget(_key, IPC_CREAT);if (_msqid == -1){std::cout << "User get failed" << std::endl;exit(-2);}}// 释放消息队列void destroy(){if (_msqid != -1){int n = msgctl(_msqid, IPC_RMID, nullptr);if (n == -1){std::cout << "remove messque falied" << std::endl;exit(3);}}}// 发送消息void Sendmessage(){// 定义消息对象并初始化mymessage data;memset(&data, 0, sizeof(data));// 标明谁发的data.mtype = Ser;while (true){std::string s;std::getline(std::cin, s);memcpy(&data.mtext, s.c_str(), sizeof(s));//  发送消息                   不能携带第一个字段 缓冲区满了就阻塞int n = msgsnd(_msqid, &data, sizeof(data.mtext), 0);if (n == -1){std::cout << "msgsnd failed" << std::endl;}if (data.mtext[0] == 'q')break;}}// 接收消息void Recvmessage(){// 定义消息对象mymessage data;while (true){//   接收消息                 不能携带第一个字段 收谁的消息 缓冲区为空就阻塞int n = msgrcv(_msqid, &data, sizeof(data.mtext), Ser, 0);data.mtext[n] = 0;if (n == -1){std::cout << "msgrcv failed" << std::endl;}if (data.mtext[0] == 'q')break;std::cout << data.mtext << std::endl;}}~Message_Queue(){}private:// 标识消息队列idint _msqid;// 形成的 key_tstd::string _path;int _proj_id;key_t _key;
};// 创建者/收消息
class server : public Message_Queue
{
public:server() : Message_Queue(path, proj_id){Message_Queue::Create();Message_Queue::Recvmessage();}~server(){Message_Queue::destroy();}
};// 获取者/发消息
class client : public Message_Queue
{
public:client() : Message_Queue(path, proj_id){Message_Queue::User();Message_Queue::Sendmessage();}~client() {}
};

四 基于责任链模式和消息队列对数据处理

1. 什么是责任链模式

责任链属于行为类设计模式,也就是程序在运行的时候每个模块之间都有任务和优先级,哪些任务完成或者不完成的结果要交给下一个节点(处理点),也可以不启动这个任务,就好比食堂打饭打完这个菜,后面的菜可打可不打。

2. 下面基于责任链模式来对消息队列获取的消息进行处理
  • 给消息加上时间戳
  • 把消息保存到文件
  • 如果文件行数超过一个范围则重命名

基于多态实现的责任链

  • 先分别创建3个任务类继承自剧中调度类
  • 因为都继承了这个类,在这个类定义一个指针,在把这个指针指向下一个节点的剧中调度类
  • 调用这个父类被继承的方法,就会构成多态转而去调用子类的方法进行数据处理

3. demo代码

Chai_of_responsibility.hpp

#include <iostream>
#include <memory>
#include <ctime>
#include <string>
#include <unistd.h>
#include <fstream>
#include <cstdio>class base
{
public:base() : _status(true) {}virtual ~base() {}virtual void hander(const std::string &message) = 0;public:void setnext(std::shared_ptr<base> next){_next = next;}void setstatusfalse(){_status = false;}void setstatustrue(){_status = true;}protected:std::shared_ptr<base> _next;bool _status;
};class format : public base
{
public:format() {}~format() {}void hander(const std::string &message) override{std::string str;if (_status == true){// 拼上时间戳/pidstr += std::to_string(time(nullptr));str += ' ';str += std::to_string(getpid()) + '\n';std::cout << "拼上 时间戳和pid" << std::endl;}str += message;std::cout << "str -> " << str << std::endl;if (_next != nullptr){_next->hander(str);}else{std::cout << "format hander over" << std::endl;}}
};const std::string default_path = "/home/CD/linux/message_queue/Demo_mes/test_path/";
const std::string default_name = "111.txt";class save : public base
{
public:save(const std::string &path = default_path, const std::string &name = default_name): _path(path), _name(name){}~save() {}void hander(const std::string &message) override{// 保存到文件std::ofstream os;if (_status == true){std::string str = _path + _name;os.open(str, std::ios::app);if (!os.is_open()){std::cout << "save file failed" << std::endl;}os << message;std::cout << "保存成功" << std::endl;}if (_next != nullptr){_next->hander(message);}else{std::cout << "save hander over" << std::endl;}os.close();}private:std::string _path;std::string _name;
};const int range = 5;
class backup : public base
{
public:backup(const std::string &path = default_path, const std::string &name = default_name): _path(path), _name(name){}~backup() {}private:bool line_range(const std::string &file_path){std::ifstream ifs;ifs.open(file_path);int cnt = 0;std::string s;while (std::getline(ifs, s)){cnt++;}ifs.close();return cnt > range;}public:void hander(const std::string &message) override{// 重命名/备份std::string str = _path + _name;std::string sname = _name + std::to_string(time(nullptr));if (_status == true){if (line_range(_path + _name)){if (rename(str.c_str(), (_path + sname).c_str()) == -1){std::cout << "rename failed" << std::endl;}std::cout << "重命名成功" << std::endl;}}if (_next != nullptr){_next->hander(str);}else{std::cout << "backup hander over" << std::endl;}}private:std::string _path;std::string _name;
};class enter
{
public:enter(){_fm = std::make_shared<format>();_sa = std::make_shared<save>();_bk = std::make_shared<backup>();_fm->setnext(_sa);_sa->setnext(_bk);_bk->setnext(nullptr);}void Choose(bool fm, bool sa, bool bk){fm ? _fm->setstatustrue() : _fm->setstatusfalse();sa ? _sa->setstatustrue() : _sa->setstatusfalse();bk ? _bk->setstatustrue() : _bk->setstatusfalse();}void run(const std::string messgae){_fm->hander(messgae);}~enter(){}private:std::shared_ptr<format> _fm;std::shared_ptr<save> _sa;std::shared_ptr<backup> _bk;
};

相关文章:

进程间通信(消息队列)

目录 一 原理 二 API 1. ftok 2. msgget 3. msgctl 4. msgsnd 5. msgrcv 三 demo代码 四 基于责任链模式和消息队列对数据处理 1. 什么是责任链模式 2. 下面基于责任链模式来对消息队列获取的消息进行处理 前置 其实system v 版本的进程间通信&#xff0c;设计的接…...

Linux gron 命令使用详解

简介 gron 是一个独特的命令行工具&#xff0c;用于将 JSON 数据转换为离散的、易于 grep 处理的赋值语句格式。它的名字来源于 “grepable on” 或 “grepable JSON”&#xff0c;主要解决在命令行中处理复杂 JSON 数据的难题。 核心价值 gron 的核心是将 JSON 数据展平为类…...

Nginx--手写脚本压缩和切分日志(也适用于docker)

原文网址&#xff1a;Nginx--手写脚本压缩和切分日志&#xff08;也适用于docker&#xff09;_IT利刃出鞘的博客-CSDN博客 简介 本文介绍nginx如何手写脚本压缩和切分日志。 1.创建切分日志的脚本 创建脚本文件&#xff1a;/work/tmp/nginx-log_sh&#xff08;后边要用run-…...

OpenCv高阶(十八)——dlib人脸检测与识别

文章目录 一、dlib库是什么&#xff1f;二、opencv库与dlib库的优缺点对比1、opencv优缺点2、dlib库优缺点 三、dlib库的安装1、在线安装2、本地安装 四、dlib库的人脸检测器1. 基于 HOG 的检测器2. 基于 CNN 的检测器 五、dlib人脸检测的简单使用1、导入必要库2、初始化人脸检…...

中山大学无人机具身导航新突破!FlightGPT:迈向通用性和可解释性的无人机视觉语言导航

作者&#xff1a;Hengxing Cai 1 , 2 ^{1,2} 1,2, Jinhan Dong 2 , 3 ^{2,3} 2,3, Jingjun Tan 1 ^{1} 1, Jingcheng Deng 4 ^{4} 4, Sihang Li 2 ^{2} 2, Zhifeng Gao 2 ^{2} 2, Haidong Wang 1 ^{1} 1, Zicheng Su 5 ^{5} 5, Agachai Sumalee 6 ^{6} 6, Renxin Zhong 1 ^{1} …...

WIN11+CUDA11.8+VS2019配置BundleFusion

参考&#xff1a; BundleFusion:VS2019 2017 ,CUDA11.5,win11&#xff0c;Realsense D435i离线数据包跑通&#xff0c;环境搭建 - 知乎 Win10VS2017CUDA10.1环境下配置BundleFusion - 知乎 BundleFusionWIN11VS2019 CUDA11.7环境配置-CSDN博客 我的环境&#xff1a;Win 11…...

WPF prism

Prism Prism.Dryloc 包 安装 Nuget 包 - Prism.DryIoc 1. 修改 App.xaml 修改 App.xaml 文件&#xff0c;添加 prism 命名空间, 继承由 Application → PrismApplication&#xff0c;删除默认启动 url, StartupUri“MainWindow.xaml” <dryioc:PrismApplicationx:Class…...

实时同步缓存,与阶段性同步缓存——补充理解《补充》

根据 Redis 缓存的数据与 DBMS 中数据的同步性划分&#xff0c;缓存一般可划分为两类&#xff1a;实时同步缓存&#xff0c;与阶段性同步缓存。 实时同步缓存是指&#xff0c;DBMS 中数据更新后&#xff0c;Redis 缓存中的存放的相关数据会被立即清 除&#xff0c;以促使再有对…...

[Redis] Redis:高性能内存数据库与分布式架构设计

标题&#xff1a;[Redis] 浅谈分布式系统 水墨不写bug 文章目录 一、什么是Redis&#xff1f;一、核心定位二、核心优势三、典型应用场景四、Redis vs 传统数据库 二、架构选择与设计1、单机架构&#xff08;应用程序 数据库服务器&#xff09;2、应用程序和数据库服务器分离3…...

Mobaxterm解锁Docker

Mobaxterm是一款功能强大的终端模拟器和SSH客户端&#xff0c;它支持Windows、Linux和Mac操作系统&#xff0c;对于使用Docker的开发者和运维人员来说&#xff0c;Mobaxterm是一个非常有用的工具。本文将深入解析Mobaxterm&#xff0c;并分享一些使用Docker时的高效技巧。 Mob…...

React 第四十九节 Router中useNavigation的具体使用详解及注意事项

前言 useNavigation 是 React Router 中一个强大的钩子&#xff0c;用于获取当前页面导航的状态信息。 它可以帮助开发者根据导航状态优化用户体验&#xff0c;如显示加载指示器、防止重复提交等。 一、useNavigation核心用途 检测导航状态&#xff1a;判断当前是否正在进行…...

【JavaEE】Spring事务

目录 一、事务简介二、Spring事务的实现2.1 事务的操作2.2 分类2.2.1 Spring编程式事务2.2.2 Spring 声明式事务 Transactional2.2.2.1 Transactional 详解2.2.2.1.1 rollbackFor2.2.2.1.2 Isolation2.2.2.1.3 propagation 一、事务简介 事务&#xff1a;事务是⼀组操作的集合…...

Flink 状态管理深度解析:类型与后端的全面探索

在流处理场景中,数据往往是连续且无界的,为了准确处理这些数据并维持计算的连续性,Flink 引入了状态管理机制。Flink 的状态管理包含状态类型和状态后端两大部分,它们相辅相成,共同为作业的可靠性、容错性和性能提供保障。接下来,我们将深入探究 Flink 状态管理中状态类型…...

Android15 userdebug版本不能remount

背景描述&#xff1a; 最近调试Android Vendor Hal的时候发现一个奇怪的现象: android userdebug版本刷到设备中&#xff0c;执行adb root没提示错误&#xff0c;但是没有获取到root权限。 Android设备运行的系统版本有三种情况&#xff1a;user版本、userdebug版本和eng版本…...

R包安装报错解决案例系列|R包使用及ARM架构解决data.table安装错误问题

有不少同学是Mac系统的&#xff0c;分析过程中会发现部分R包总是安装不成功&#xff0c;这是因为部分R包基于windowsx86架构编译的&#xff0c;最常见的就是含 C/C/Fortran 的包&#xff0c;对于初学者都是建议linux和win去做&#xff0c;Windows 通常直接安装预编译好的二进制…...

k8s Headless Service

Kubernetes 无头服务&#xff08;Headless Service&#xff09;配置与使用场景 1.无头服务概述 无头服务&#xff08;Headless Service&#xff09;是 Kubernetes 中的一种特殊服务类型&#xff0c;它**不分配集群 IP&#xff08;ClusterIP&#xff09;&#xff0c;而是直接暴露…...

Linux上安装MongoDB

目录 一、在Linux系统安装MongoDB服务器 1、下载MongoDB 2、上传MongoDB并解压 3、创建必要目录 4、配置环境变量 5、创建配置文件 6、启动命令 7、验证安装 二、在Linux系统安装MongoDB客户端Shell 1、下载MongoDB Shell 2、上传MongoDB Shell并解压 3、配置环境变…...

Redis最佳实践——安全与稳定性保障之访问控制详解

Redis 在电商应用的安全与稳定性保障之访问控制全面详解 一、安全访问控制体系架构 1. 多层级防护体系 #mermaid-svg-jpkDj2nKxCq9AXIW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-jpkDj2nKxCq9AXIW .error-ico…...

【华为开发者空间 x DeepSeek】服务器运行Ollama并在本地调用

文章概述 本文介绍了如何在 华为开发者空间 中快速部署并使用 Ollama 模型运行框架&#xff0c;并结合 deepseek-r1 模型进行本地或远程交互推理。内容涵盖环境准备、模型配置、网卡绑定、内网穿透、API调用等多个环节&#xff0c;适合希望在华为云上快速搭建本地类大模型推理…...

Halcon

regiongrowing — Segment an image using regiongrowing. get_obj_class:获取图像的类别名 get_region_points&#xff1a;获取区域的像素 get_contour_xld&#xff1a;获取xld像素点坐标 get_polygon_xld&#xff1a;获取多边形的数据 get_region_polygon:计算一个区域的…...

STM32之IIC(重点)和OLED屏

内部集成电路概述 基本概念 内部集成电路&#xff08;Inter Integrated Circuit&#xff09;的简称叫做IIC或者I2C&#xff0c;是一种简单的、半双工同步通信的串行通信接口&#xff0c;IIC总线是上世纪80年代&#xff08;1982年&#xff09;由飞利浦公司设计出来&#xff0c…...

学习海康VisionMaster之表面缺陷滤波

一&#xff1a;进一步学习了 今天学习下VisionMaster中的表面缺陷滤波&#xff1a;简单、无纹理背景的表面缺陷检测&#xff0c;可以检测表面的异物&#xff0c;缺陷&#xff0c;划伤等 二&#xff1a;开始学习 1&#xff1a;什么表面缺陷滤波&#xff1f; 表面缺陷滤波的核心…...

游戏引擎学习第314天:将精灵拆分成多个层

回顾并为今天的工作做准备 我们今天继续昨天开始的工作&#xff0c;现在我们要回到渲染中处理 Z 值的最终环节。我们目前已经有一个我们认为还算合理的排序方式&#xff0c;虽然可能还需要在接下来的过程中进行一些调整&#xff0c;但总体上已经有了一个明确的方向。 我们已经…...

【学习笔记】深度学习-梯度概念

一、定义 梯度向量不仅表示函数变化的速度&#xff0c;还表示函数增长最快的方向 二、【问】为什么说它表示方向&#xff1f; 三、【问】那在深度学习梯度下降的时候&#xff0c;还要判断梯度是正是负来更新参数吗&#xff1f; 假设某个参数是 w&#xff0c;损失函数对它的…...

【数据结构】图的存储(邻接矩阵与邻接表)

图的存储结构 因为图中既有节点&#xff0c;又有边(节点与节点之间的关系)&#xff0c;因此&#xff0c;在图的存储中&#xff0c;只需要保存&#xff1a;节点和边关系即可。 节点保存比较简单&#xff0c;只需要一段连续空间即可&#xff0c;那边关系该怎么保存呢&#xff1…...

tomcat yum安装

使用yum安装 yum install -y java-1.7.0-openjdk* tomcat* --disablerepoepel## java-1.7.0-openjdk* 注意&#xff1a;最终安装的是java-1.8.0版本## --disablerepoepel 禁用&#xff1a;EPEL源&#xff0c;防止版本冲突 java -version (2) 启停&#xff1a;Tomcat 7 s…...

【Elasticsearch】suggest_mode

suggest_mode 是 Elasticsearch 中 term suggester 和 phrase suggester 的一个参数&#xff0c;用于控制建议的生成方式。它有以下三种模式&#xff1a; 1. missing&#xff1a;默认值。仅对索引中不存在的词项提供建议。如果输入的词已经在索引中存在&#xff0c;则不会生成建…...

数据库只更新特定字段的两种方式(先读后写 vs. 动态组织 SQL)-golang SQLx 实现代码(动态组织 SQL)

文章目录 数据库只更新特定字段的两种方式&#xff08;先读后写 vs. 动态组织 SQL&#xff09;go语言例子使用GORM的示例&#xff08;最常用的Go ORM库&#xff09;使用SQLx的两种更新方式实现golang SQLx 实现代码&#xff08;动态组织 SQL&#xff09; 数据库只更新特定字段的…...

从翻译后修饰角度解析人工合成途径与底盘细胞的适配性-文献精读136

Compatibility between synthetic pathway and chassis cells from the viewpoint of post-translational modifications 从翻译后修饰角度解析人工合成途径与底盘细胞的适配性 摘要 揭示工程化设计的人工合成途径与底盘细胞整体代谢网络的交互作用及适配性机制是合成生物学研…...

Cesium快速入门到精通系列教程一

一、打造第一个Cesium应用 1、官方渠道下载Cesium&#xff08;可选择历史版本&#xff09; ​​GitHub Releases页面​​ 访问 Cesium GitHub Releases&#xff0c;此处列出了所有正式发布的版本。 通过标签&#xff08;如 v1.95.0&#xff09;选择目标版本&#xff0c;下载…...