Linux - 进程间通信(2)
目录
2、进程池
1)理解进程池
2)进程池的实现
整体框架:
a. 加载任务
b. 先描述,再组织
I. 先描述
II. 再组织
c. 创建信道和子进程
d. 通过channel控制子进程
e. 回收管道和子进程
问题1:
解答1:
问题2:
解答2:
f. 将进程池本身和任务文件本身进行解耦
3)完整代码
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它们的父进程都是bash
2、进程池
1)理解进程池
a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中
b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!
c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡
父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)
函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作
2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);return 0;
}
a. 加载任务
我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作
Task.hpp:
#pragma once#include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print() {std::cout << "I am print task" << std::endl; } void DownLoad() {std::cout << "I am a download task" << std::endl; } void Flush() {std::cout << "I am a flush task" << std::endl; }task_t tasks[TaskNum];void LoadTask() {srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush; }void ExcuteTask(int number) {if(number < 0 || number > 2) return;tasks[number](); }int SelectTask() {return rand() % TaskNum; }void work(int rfd) {while (true){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}} }
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务return 0;
}
b. 先描述,再组织
I. 先描述
需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务
class Channel { private:int _wfd;int _subprocessid;std::string _name; };在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)
II. 再组织
std::vector<Channel> channels;我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理
c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0) exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);work();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}
用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数
通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后,关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程,等待父进程回收
d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}
向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程
e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
我们先将所有的信道关闭,然后在逐个将子进程等待回收
问题1:
那为什么不能边关闭信道边回收呢??
即
解答1:


在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞
在work结束后,才会到下一步close和exit退出子进程;
work结束需要的条件是 n == 0,即读端返回值为0,即
因此上述那种边关闭信道,边wait子进程的方法会阻塞
问题2:
为什么这种方法又能成功回收呢??
解答2:
因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况

f. 将进程池本身和任务文件本身进行解耦
用回调函数可以很好的改善代码的耦合性

通过文件描述符重定向 dup2,将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数
// 重定向
这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
--- 将管道的逻辑和执行任务的逻辑进一步进行解耦
// task_t task : 回调函数
有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel() { close(_wfd); }void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;int _subprocessid;std::string _name;
};// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);dup2(pipefd[0], 0);task();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}void ClearUpChannel(std::vector<Channel> &channels)
{// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// int num = channels.size() -1;// while(num >= 0)// {// channels[num].CloseChannel();// channels[num--].Wait();// }for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);// // for test// for(auto &channel : channels)// {// std::cout << "====================" << std::endl;// std::cout << channel.GetName() << std::endl;// std::cout << channel.GetWfd() << std::endl;// std::cout << channel.GetProcessId() << std::endl;// }// sleep(100);return 0;
}
Task.hpp:
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}}
}
Makefile:
processpool:processpool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool
相关文章:
Linux - 进程间通信(2)
目录 2、进程池 1)理解进程池 2)进程池的实现 整体框架: a. 加载任务 b. 先描述,再组织 I. 先描述 II. 再组织 c. 创建信道和子进程 d. 通过channel控制子进程 e. 回收管道和子进程 问题1: 解答1ÿ…...
Kafka 消费端反复 Rebalance: `Attempt to heartbeat failed since group is rebalancing`
文章目录 Kafka 消费端反复 Rebalance: Attempt to heartbeat failed since group is rebalancing1. Rebalance 过程概述2. 错误原因分析2.1 消费者组频繁加入或退出2.1.1 消费者故障导致频繁重启2.1.2. 消费者加入和退出导致的 Rebalance2.1.3 消费者心跳超时导致的 Rebalance…...
SpringBoot+Electron教务管理系统 附带详细运行指导视频
文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.查询课程表代码2.保存学生信息代码3.用户登录代码 一、项目演示 项目演示地址: 视频地址 二、项目介绍 项目描述:这是一个基于SpringBootElectron框架开发的教务管理系统。首先ÿ…...
操作系统(Linux Kernel 0.11Linux Kernel 0.12)解读整理——内核初始化(main init)之控制台工作
前言 在 Linux 内核中,字符设备主要包括控制终端设备和串行终端设备,对这些设备的输入输出涉及控制台驱动程序,这包括键盘中断驱动程序 keyboard.S 和控制台显示驱动程序 console.c,还有终端驱动程序与上层程序之间的接口部分。 终端驱动程序…...
Autogen_core: Message and Communication
目录 完整代码代码解释1. 消息的数据类:2. 创建代理人(MyAgent):3. 创建和运行代理人的运行时环境:4. 根据发送者路由消息的代理(RoutedBySenderAgent):5. 创建和运行带路由的代理&a…...
ComfyUI工作流教程、软件使用、开发指导、模型下载
在人工智能和设计技术迅速发展的今天,AI赋能的工作流已成为创意设计与生产的重要工具。无论是图片处理、服装试穿,还是室内设计与3D建模,这些智能化的解决方案极大地提高了效率和创作质量。 为了帮助设计师、开发者以及AI技术爱好者更好地利用这些工具,我们整理了一份详尽…...
零基础Vue学习1——Vue学习前环境准备
目录 环境准备 创建Vue项目 项目目录说明 后续开发过程中常用命令 环境准备 安装开发工具:vscode、webstorm、idea都可以安装node:V22以上版本即可安装pnpm 不知道怎么安装的可以私信我教你方法 创建Vue项目 本地新建一个文件夹,之后在文件夹下打开…...
定西市建筑房屋轮廓数据shp格式gis无偏移坐标(字段有高度和楼层)内容测评
定西市建筑房屋轮廓数据是GIS(Geographic Information System,地理信息系统)领域的重要资源,用于城市规划、土地管理、环境保护等多个方面。这份2022年的数据集采用shp(Shapefile)格式,这是一种…...
汉语向编程指南
汉语向编程指南 一、引言王阳明代数与流形学习理论慢道缓行理性人类型指标系统为己之学与意气实体过程晏殊几何学半可分离相如矩阵与生成气质邻域镶嵌气度曲面细分生成气质邻域镶嵌气度曲面细分社会科学概论琴生生物机械科技工业研究所软凝聚态物理开发工具包琴生生物机械 报告…...
Writing an Efficient Vulkan Renderer
本文出自GPU Zen 2。 Vulkan 是一个新的显式跨平台图形 API。它引入了许多新概念,即使是经验丰富的图形程序员也可能不熟悉。Vulkan 的主要目标是性能——然而,获得良好的性能需要深入了解这些概念及其高效应用方法,以及特定驱动程序实现的实…...
AI常见的算法
人工智能(AI)中常见的算法分为多个领域,如机器学习、深度学习、强化学习、自然语言处理和计算机视觉等。以下是一些常见的算法及其用途: 1. 机器学习 (Machine Learning) 监督学习 (Supervised Learning) 线性回归 (Linear Regr…...
LibreChat
文章目录 一、关于 LibreChat✨特点 二、使用LibreChat🪶多合一AI对话 一、关于 LibreChat LibreChat 是增强的ChatGPT克隆:Features Agents, Anthropic, AWS, OpenAI, Assistants API, Azure, Groq, o1, GPT-4o, Mistral, OpenRouter, Vertex AI, Gemi…...
Spring Boot 日志:项目的“行车记录仪”
一、什么是Spring Boot日志 (一)日志引入 在正式介绍日志之前,我们先来看看上篇文章中(Spring Boot 配置文件)中的验证码功能的一个代码片段: 这是一段校验用户输入的验证码是否正确的后端代码,…...
Spring Boot 实现文件上传和下载
文章目录 Spring Boot 实现文件上传和下载一、引言二、文件上传1、配置Spring Boot项目2、创建文件上传控制器3、配置文件上传大小限制 三、文件下载1、创建文件下载控制器 四、使用示例1、文件上传2、文件下载 五、总结 Spring Boot 实现文件上传和下载 一、引言 在现代Web应…...
慕课:若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战,启动文档
代码: Javahhhh/miaosha191: 运行成功了慕课若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战https://github.com/Javahhhh/miaosha191 https://github.com/Javahhhh/miaosha191 miaosha项目启动文档 需安装的配置环境: VMwar…...
React第二十七章(Suspense)
Suspense Suspense 是一种异步渲染机制,其核心理念是在组件加载或数据获取过程中,先展示一个占位符(loading state),从而实现更自然流畅的用户界面更新体验。 应用场景 异步组件加载:通过代码分包实现组件…...
虚幻基础08:组件接口
能帮到你的话,就给个赞吧 😘 文章目录 作用 作用 组件接口:可以直接调用对方的组件接口,而无需转换为actor。 实现对象间的通知。 A 通知 B 做什么。...
iPhone SE(第三代) 设备详情图
目录 产品宣传图内部图——后设备详细信息 产品宣传图 内部图——后 设备详细信息 信息收集于HubWeb.cn...
2025苹果CMS v10短剧模板源码
文件不到70kb,加载非常快 无配置,没有详情页,上传就可以直接使用 使用教程:上传到网站template目录并解压、进入网站后台选择模板 注意:默认调用ID为1的数据和扩展分类,建议新建站使用 源码下载…...
2007-2020年各省国内专利申请授权量数据
2007-2020年各省国内专利申请授权量数据 1、时间:2007-2020年 2、来源:国家统计局、统计年鉴 3、指标:行政区划代码、地区名称、年份、国内专利申请授权量(项) 4、范围:31省 5、指标解释:专利是专利权的简称&…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...
Spring Security 认证流程——补充
一、认证流程概述 Spring Security 的认证流程基于 过滤器链(Filter Chain),核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤: 用户提交登录请求拦…...
统计学(第8版)——统计抽样学习笔记(考试用)
一、统计抽样的核心内容与问题 研究内容 从总体中科学抽取样本的方法利用样本数据推断总体特征(均值、比率、总量)控制抽样误差与非抽样误差 解决的核心问题 在成本约束下,用少量样本准确推断总体特征量化估计结果的可靠性(置…...
MLP实战二:MLP 实现图像数字多分类
任务 实战(二):MLP 实现图像多分类 基于 mnist 数据集,建立 mlp 模型,实现 0-9 数字的十分类 task: 1、实现 mnist 数据载入,可视化图形数字; 2、完成数据预处理:图像数据维度转换与…...










