Linux - 进程间通信(2)
目录
2、进程池
1)理解进程池
2)进程池的实现
整体框架:
a. 加载任务
b. 先描述,再组织
I. 先描述
II. 再组织
c. 创建信道和子进程
d. 通过channel控制子进程
e. 回收管道和子进程
问题1:
解答1:
问题2:
解答2:
f. 将进程池本身和任务文件本身进行解耦
3)完整代码
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它们的父进程都是bash
2、进程池
1)理解进程池
a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中
b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!
c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡
父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)
函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作
2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);return 0;
}
a. 加载任务
我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作
Task.hpp:
#pragma once#include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print() {std::cout << "I am print task" << std::endl; } void DownLoad() {std::cout << "I am a download task" << std::endl; } void Flush() {std::cout << "I am a flush task" << std::endl; }task_t tasks[TaskNum];void LoadTask() {srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush; }void ExcuteTask(int number) {if(number < 0 || number > 2) return;tasks[number](); }int SelectTask() {return rand() % TaskNum; }void work(int rfd) {while (true){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}} }
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务return 0;
}
b. 先描述,再组织
I. 先描述
需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务
class Channel { private:int _wfd;int _subprocessid;std::string _name; };
在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)
II. 再组织
std::vector<Channel> channels;
我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理
c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0) exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);work();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}
用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数
通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后,关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程,等待父进程回收
d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}
向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程
e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
我们先将所有的信道关闭,然后在逐个将子进程等待回收
问题1:
那为什么不能边关闭信道边回收呢??
即
解答1:
在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞
在work结束后,才会到下一步close和exit退出子进程;
work结束需要的条件是 n == 0,即读端返回值为0,即
因此上述那种边关闭信道,边wait子进程的方法会阻塞
问题2:
为什么这种方法又能成功回收呢??
解答2:
因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况
f. 将进程池本身和任务文件本身进行解耦
用回调函数可以很好的改善代码的耦合性
通过文件描述符重定向 dup2,将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数
// 重定向
这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
--- 将管道的逻辑和执行任务的逻辑进一步进行解耦
// task_t task : 回调函数
有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel() { close(_wfd); }void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;int _subprocessid;std::string _name;
};// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);dup2(pipefd[0], 0);task();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}void ClearUpChannel(std::vector<Channel> &channels)
{// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// int num = channels.size() -1;// while(num >= 0)// {// channels[num].CloseChannel();// channels[num--].Wait();// }for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);// // for test// for(auto &channel : channels)// {// std::cout << "====================" << std::endl;// std::cout << channel.GetName() << std::endl;// std::cout << channel.GetWfd() << std::endl;// std::cout << channel.GetProcessId() << std::endl;// }// sleep(100);return 0;
}
Task.hpp:
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}}
}
Makefile:
processpool:processpool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool
相关文章:

Linux - 进程间通信(2)
目录 2、进程池 1)理解进程池 2)进程池的实现 整体框架: a. 加载任务 b. 先描述,再组织 I. 先描述 II. 再组织 c. 创建信道和子进程 d. 通过channel控制子进程 e. 回收管道和子进程 问题1: 解答1ÿ…...

Kafka 消费端反复 Rebalance: `Attempt to heartbeat failed since group is rebalancing`
文章目录 Kafka 消费端反复 Rebalance: Attempt to heartbeat failed since group is rebalancing1. Rebalance 过程概述2. 错误原因分析2.1 消费者组频繁加入或退出2.1.1 消费者故障导致频繁重启2.1.2. 消费者加入和退出导致的 Rebalance2.1.3 消费者心跳超时导致的 Rebalance…...

SpringBoot+Electron教务管理系统 附带详细运行指导视频
文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.查询课程表代码2.保存学生信息代码3.用户登录代码 一、项目演示 项目演示地址: 视频地址 二、项目介绍 项目描述:这是一个基于SpringBootElectron框架开发的教务管理系统。首先ÿ…...

操作系统(Linux Kernel 0.11Linux Kernel 0.12)解读整理——内核初始化(main init)之控制台工作
前言 在 Linux 内核中,字符设备主要包括控制终端设备和串行终端设备,对这些设备的输入输出涉及控制台驱动程序,这包括键盘中断驱动程序 keyboard.S 和控制台显示驱动程序 console.c,还有终端驱动程序与上层程序之间的接口部分。 终端驱动程序…...

Autogen_core: Message and Communication
目录 完整代码代码解释1. 消息的数据类:2. 创建代理人(MyAgent):3. 创建和运行代理人的运行时环境:4. 根据发送者路由消息的代理(RoutedBySenderAgent):5. 创建和运行带路由的代理&a…...

ComfyUI工作流教程、软件使用、开发指导、模型下载
在人工智能和设计技术迅速发展的今天,AI赋能的工作流已成为创意设计与生产的重要工具。无论是图片处理、服装试穿,还是室内设计与3D建模,这些智能化的解决方案极大地提高了效率和创作质量。 为了帮助设计师、开发者以及AI技术爱好者更好地利用这些工具,我们整理了一份详尽…...

零基础Vue学习1——Vue学习前环境准备
目录 环境准备 创建Vue项目 项目目录说明 后续开发过程中常用命令 环境准备 安装开发工具:vscode、webstorm、idea都可以安装node:V22以上版本即可安装pnpm 不知道怎么安装的可以私信我教你方法 创建Vue项目 本地新建一个文件夹,之后在文件夹下打开…...

定西市建筑房屋轮廓数据shp格式gis无偏移坐标(字段有高度和楼层)内容测评
定西市建筑房屋轮廓数据是GIS(Geographic Information System,地理信息系统)领域的重要资源,用于城市规划、土地管理、环境保护等多个方面。这份2022年的数据集采用shp(Shapefile)格式,这是一种…...

汉语向编程指南
汉语向编程指南 一、引言王阳明代数与流形学习理论慢道缓行理性人类型指标系统为己之学与意气实体过程晏殊几何学半可分离相如矩阵与生成气质邻域镶嵌气度曲面细分生成气质邻域镶嵌气度曲面细分社会科学概论琴生生物机械科技工业研究所软凝聚态物理开发工具包琴生生物机械 报告…...

Writing an Efficient Vulkan Renderer
本文出自GPU Zen 2。 Vulkan 是一个新的显式跨平台图形 API。它引入了许多新概念,即使是经验丰富的图形程序员也可能不熟悉。Vulkan 的主要目标是性能——然而,获得良好的性能需要深入了解这些概念及其高效应用方法,以及特定驱动程序实现的实…...

AI常见的算法
人工智能(AI)中常见的算法分为多个领域,如机器学习、深度学习、强化学习、自然语言处理和计算机视觉等。以下是一些常见的算法及其用途: 1. 机器学习 (Machine Learning) 监督学习 (Supervised Learning) 线性回归 (Linear Regr…...

LibreChat
文章目录 一、关于 LibreChat✨特点 二、使用LibreChat🪶多合一AI对话 一、关于 LibreChat LibreChat 是增强的ChatGPT克隆:Features Agents, Anthropic, AWS, OpenAI, Assistants API, Azure, Groq, o1, GPT-4o, Mistral, OpenRouter, Vertex AI, Gemi…...

Spring Boot 日志:项目的“行车记录仪”
一、什么是Spring Boot日志 (一)日志引入 在正式介绍日志之前,我们先来看看上篇文章中(Spring Boot 配置文件)中的验证码功能的一个代码片段: 这是一段校验用户输入的验证码是否正确的后端代码,…...

Spring Boot 实现文件上传和下载
文章目录 Spring Boot 实现文件上传和下载一、引言二、文件上传1、配置Spring Boot项目2、创建文件上传控制器3、配置文件上传大小限制 三、文件下载1、创建文件下载控制器 四、使用示例1、文件上传2、文件下载 五、总结 Spring Boot 实现文件上传和下载 一、引言 在现代Web应…...

慕课:若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战,启动文档
代码: Javahhhh/miaosha191: 运行成功了慕课若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战https://github.com/Javahhhh/miaosha191 https://github.com/Javahhhh/miaosha191 miaosha项目启动文档 需安装的配置环境: VMwar…...

React第二十七章(Suspense)
Suspense Suspense 是一种异步渲染机制,其核心理念是在组件加载或数据获取过程中,先展示一个占位符(loading state),从而实现更自然流畅的用户界面更新体验。 应用场景 异步组件加载:通过代码分包实现组件…...

虚幻基础08:组件接口
能帮到你的话,就给个赞吧 😘 文章目录 作用 作用 组件接口:可以直接调用对方的组件接口,而无需转换为actor。 实现对象间的通知。 A 通知 B 做什么。...

iPhone SE(第三代) 设备详情图
目录 产品宣传图内部图——后设备详细信息 产品宣传图 内部图——后 设备详细信息 信息收集于HubWeb.cn...

2025苹果CMS v10短剧模板源码
文件不到70kb,加载非常快 无配置,没有详情页,上传就可以直接使用 使用教程:上传到网站template目录并解压、进入网站后台选择模板 注意:默认调用ID为1的数据和扩展分类,建议新建站使用 源码下载…...

2007-2020年各省国内专利申请授权量数据
2007-2020年各省国内专利申请授权量数据 1、时间:2007-2020年 2、来源:国家统计局、统计年鉴 3、指标:行政区划代码、地区名称、年份、国内专利申请授权量(项) 4、范围:31省 5、指标解释:专利是专利权的简称&…...

第一天-嵌入式应用开发介绍
首先,我们来介绍一下嵌入式的发展路线,虽然嵌入式的知识点众多,但是总体上来说,嵌入式分为以下两条主要路线: 单片机开发ArmLinux开发 当然,还有其他的一些例如FPGA这种的我们就不计算在内了,F…...

约瑟夫问题(信息学奥赛一本通-2037)
【题目描述】 N个人围成一圈,从第一个人开始报数,数到M的人出圈;再由下一个人开始报数,数到M 的人出圈;…输出依次出圈的人的编号。 【输入】 输入N和M。 【输出】 输出一行,依次出圈的人的编号。 【输入样…...

WPF5-x名称空间
1. x名称空间2. x名称空间内容3. x名称空间内容分类 3.1. x:Name3.2. x:Key3.3. x:Class3.4. x:TypeArguments 4. 总结 1. x名称空间 “x名称空间”的x是映射XAML名称空间时给它取的名字(取XAML的首字母),里面的成员(如x:Class、…...

一个python项目中的文件和目录的作用是什么?scripts,venv,predict的具体含义
今天学习SadTalker的项目,但目录和文件不知道都是干什么的,总结记录下,方便后续使用。 目录 1. docs: 作用: 这个文件夹通常包含项目的文档。文档可能包括用户指南、API 文档、开发文档等。 2. examples: 作用: 这里通常包含一些示例代码…...

python学opencv|读取图像(四十八)使用cv2.bitwise_xor()函数实现图像按位异或运算
【0】基础定义 按位与运算:两个等长度二进制数上下对齐,全1取1,其余取0。 按位或运算:两个等长度二进制数上下对齐,有1取1,其余取0。 按位取反运算:一个二进制数,0变1,1变0。 按…...

YOLOv11-ultralytics-8.3.67部分代码阅读笔记-block.py
block.py ultralytics\nn\modules\block.py 目录 block.py 1.所需的库和模块 2.class DFL(nn.Module): 3.class Proto(nn.Module): 4.class HGStem(nn.Module): 5.class HGBlock(nn.Module): 6.class SPP(nn.Module): 7.class SPPF(nn.Module): 8.class C1(nn…...

c++多态
1.多态的概念 通俗来说,就是多种形态,具体点就是去完成某个行为,当不同的对象去完成时会产生出不同 的状态。 2.多态的定义及实现 2.1多态的构成条件 多态是在不同继承关系的类对象,去调用同一函数,产生了不同的行为…...

ResNeSt: Split-Attention Networks 参考论文
参考文献 [1] Tensorflow Efficientnet. https://github.com/tensorflow/tpu/tree/master/models/official/efficientnet. Accessed: 2020-03-04. 中文翻译:[1] TensorFlow EfficientNet. https://github.com/tensorflow/tpu/tree/master/models/official/efficien…...

Blazor-选择循环语句
今天我们来说说Blazor选择语句和循环语句。 下面我们以一个简单的例子来讲解相关的语法,我已经创建好了一个Student类,以此类来进行语法的运用 因为我们需要交互性所以我们将类创建在*.client目录下 if 我们做一个学生信息的显示,Gender为…...

从AD的原理图自动提取引脚网络的小工具
这里跟大家分享一个我自己写的小软件,实现从AD的原理图里自动找出网络名称和引脚的对应。存成文本方便后续做表格或是使用简单行列编辑生成引脚约束文件(如.XDC .UCF .TCL等)。 我们在FPGA设计中需要引脚锁定文件,就是指示TOP层…...