Linux - 进程间通信(2)
目录
2、进程池
1)理解进程池
2)进程池的实现
整体框架:
a. 加载任务
b. 先描述,再组织
I. 先描述
II. 再组织
c. 创建信道和子进程
d. 通过channel控制子进程
e. 回收管道和子进程
问题1:
解答1:
问题2:
解答2:
f. 将进程池本身和任务文件本身进行解耦
3)完整代码
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它们的父进程都是bash
2、进程池
1)理解进程池
a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中
b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!
c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡
父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)
函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作
2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);return 0;
}
a. 加载任务
我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作
Task.hpp:
#pragma once#include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print() {std::cout << "I am print task" << std::endl; } void DownLoad() {std::cout << "I am a download task" << std::endl; } void Flush() {std::cout << "I am a flush task" << std::endl; }task_t tasks[TaskNum];void LoadTask() {srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush; }void ExcuteTask(int number) {if(number < 0 || number > 2) return;tasks[number](); }int SelectTask() {return rand() % TaskNum; }void work(int rfd) {while (true){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}} }
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务return 0;
}
b. 先描述,再组织
I. 先描述
需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务
class Channel { private:int _wfd;int _subprocessid;std::string _name; };
在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)
II. 再组织
std::vector<Channel> channels;
我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理
c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0) exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);work();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}
用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数
通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后,关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程,等待父进程回收
d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}
向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程
e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
我们先将所有的信道关闭,然后在逐个将子进程等待回收
问题1:
那为什么不能边关闭信道边回收呢??
即
解答1:
在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞
在work结束后,才会到下一步close和exit退出子进程;
work结束需要的条件是 n == 0,即读端返回值为0,即
因此上述那种边关闭信道,边wait子进程的方法会阻塞
问题2:
为什么这种方法又能成功回收呢??
解答2:
因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况
f. 将进程池本身和任务文件本身进行解耦
用回调函数可以很好的改善代码的耦合性
通过文件描述符重定向 dup2,将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数
// 重定向
这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
--- 将管道的逻辑和执行任务的逻辑进一步进行解耦
// task_t task : 回调函数
有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel() { close(_wfd); }void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;int _subprocessid;std::string _name;
};// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);dup2(pipefd[0], 0);task();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}void ClearUpChannel(std::vector<Channel> &channels)
{// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// int num = channels.size() -1;// while(num >= 0)// {// channels[num].CloseChannel();// channels[num--].Wait();// }for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);// // for test// for(auto &channel : channels)// {// std::cout << "====================" << std::endl;// std::cout << channel.GetName() << std::endl;// std::cout << channel.GetWfd() << std::endl;// std::cout << channel.GetProcessId() << std::endl;// }// sleep(100);return 0;
}
Task.hpp:
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}}
}
Makefile:
processpool:processpool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool
相关文章:

Linux - 进程间通信(2)
目录 2、进程池 1)理解进程池 2)进程池的实现 整体框架: a. 加载任务 b. 先描述,再组织 I. 先描述 II. 再组织 c. 创建信道和子进程 d. 通过channel控制子进程 e. 回收管道和子进程 问题1: 解答1ÿ…...
Kafka 消费端反复 Rebalance: `Attempt to heartbeat failed since group is rebalancing`
文章目录 Kafka 消费端反复 Rebalance: Attempt to heartbeat failed since group is rebalancing1. Rebalance 过程概述2. 错误原因分析2.1 消费者组频繁加入或退出2.1.1 消费者故障导致频繁重启2.1.2. 消费者加入和退出导致的 Rebalance2.1.3 消费者心跳超时导致的 Rebalance…...

SpringBoot+Electron教务管理系统 附带详细运行指导视频
文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1.查询课程表代码2.保存学生信息代码3.用户登录代码 一、项目演示 项目演示地址: 视频地址 二、项目介绍 项目描述:这是一个基于SpringBootElectron框架开发的教务管理系统。首先ÿ…...

操作系统(Linux Kernel 0.11Linux Kernel 0.12)解读整理——内核初始化(main init)之控制台工作
前言 在 Linux 内核中,字符设备主要包括控制终端设备和串行终端设备,对这些设备的输入输出涉及控制台驱动程序,这包括键盘中断驱动程序 keyboard.S 和控制台显示驱动程序 console.c,还有终端驱动程序与上层程序之间的接口部分。 终端驱动程序…...
Autogen_core: Message and Communication
目录 完整代码代码解释1. 消息的数据类:2. 创建代理人(MyAgent):3. 创建和运行代理人的运行时环境:4. 根据发送者路由消息的代理(RoutedBySenderAgent):5. 创建和运行带路由的代理&a…...
ComfyUI工作流教程、软件使用、开发指导、模型下载
在人工智能和设计技术迅速发展的今天,AI赋能的工作流已成为创意设计与生产的重要工具。无论是图片处理、服装试穿,还是室内设计与3D建模,这些智能化的解决方案极大地提高了效率和创作质量。 为了帮助设计师、开发者以及AI技术爱好者更好地利用这些工具,我们整理了一份详尽…...

零基础Vue学习1——Vue学习前环境准备
目录 环境准备 创建Vue项目 项目目录说明 后续开发过程中常用命令 环境准备 安装开发工具:vscode、webstorm、idea都可以安装node:V22以上版本即可安装pnpm 不知道怎么安装的可以私信我教你方法 创建Vue项目 本地新建一个文件夹,之后在文件夹下打开…...

定西市建筑房屋轮廓数据shp格式gis无偏移坐标(字段有高度和楼层)内容测评
定西市建筑房屋轮廓数据是GIS(Geographic Information System,地理信息系统)领域的重要资源,用于城市规划、土地管理、环境保护等多个方面。这份2022年的数据集采用shp(Shapefile)格式,这是一种…...

汉语向编程指南
汉语向编程指南 一、引言王阳明代数与流形学习理论慢道缓行理性人类型指标系统为己之学与意气实体过程晏殊几何学半可分离相如矩阵与生成气质邻域镶嵌气度曲面细分生成气质邻域镶嵌气度曲面细分社会科学概论琴生生物机械科技工业研究所软凝聚态物理开发工具包琴生生物机械 报告…...
Writing an Efficient Vulkan Renderer
本文出自GPU Zen 2。 Vulkan 是一个新的显式跨平台图形 API。它引入了许多新概念,即使是经验丰富的图形程序员也可能不熟悉。Vulkan 的主要目标是性能——然而,获得良好的性能需要深入了解这些概念及其高效应用方法,以及特定驱动程序实现的实…...
AI常见的算法
人工智能(AI)中常见的算法分为多个领域,如机器学习、深度学习、强化学习、自然语言处理和计算机视觉等。以下是一些常见的算法及其用途: 1. 机器学习 (Machine Learning) 监督学习 (Supervised Learning) 线性回归 (Linear Regr…...

LibreChat
文章目录 一、关于 LibreChat✨特点 二、使用LibreChat🪶多合一AI对话 一、关于 LibreChat LibreChat 是增强的ChatGPT克隆:Features Agents, Anthropic, AWS, OpenAI, Assistants API, Azure, Groq, o1, GPT-4o, Mistral, OpenRouter, Vertex AI, Gemi…...

Spring Boot 日志:项目的“行车记录仪”
一、什么是Spring Boot日志 (一)日志引入 在正式介绍日志之前,我们先来看看上篇文章中(Spring Boot 配置文件)中的验证码功能的一个代码片段: 这是一段校验用户输入的验证码是否正确的后端代码,…...

Spring Boot 实现文件上传和下载
文章目录 Spring Boot 实现文件上传和下载一、引言二、文件上传1、配置Spring Boot项目2、创建文件上传控制器3、配置文件上传大小限制 三、文件下载1、创建文件下载控制器 四、使用示例1、文件上传2、文件下载 五、总结 Spring Boot 实现文件上传和下载 一、引言 在现代Web应…...

慕课:若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战,启动文档
代码: Javahhhh/miaosha191: 运行成功了慕课若鱼1919的视频课程:Java秒杀系统方案优化 高性能高并发实战https://github.com/Javahhhh/miaosha191 https://github.com/Javahhhh/miaosha191 miaosha项目启动文档 需安装的配置环境: VMwar…...

React第二十七章(Suspense)
Suspense Suspense 是一种异步渲染机制,其核心理念是在组件加载或数据获取过程中,先展示一个占位符(loading state),从而实现更自然流畅的用户界面更新体验。 应用场景 异步组件加载:通过代码分包实现组件…...
虚幻基础08:组件接口
能帮到你的话,就给个赞吧 😘 文章目录 作用 作用 组件接口:可以直接调用对方的组件接口,而无需转换为actor。 实现对象间的通知。 A 通知 B 做什么。...

iPhone SE(第三代) 设备详情图
目录 产品宣传图内部图——后设备详细信息 产品宣传图 内部图——后 设备详细信息 信息收集于HubWeb.cn...

2025苹果CMS v10短剧模板源码
文件不到70kb,加载非常快 无配置,没有详情页,上传就可以直接使用 使用教程:上传到网站template目录并解压、进入网站后台选择模板 注意:默认调用ID为1的数据和扩展分类,建议新建站使用 源码下载…...

2007-2020年各省国内专利申请授权量数据
2007-2020年各省国内专利申请授权量数据 1、时间:2007-2020年 2、来源:国家统计局、统计年鉴 3、指标:行政区划代码、地区名称、年份、国内专利申请授权量(项) 4、范围:31省 5、指标解释:专利是专利权的简称&…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...

图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
日常一水C
多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...

保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!
目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...
在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7
在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...