Linux - 进程间通信(2)
目录
2、进程池
1)理解进程池
2)进程池的实现
整体框架:
a. 加载任务
b. 先描述,再组织
I. 先描述
II. 再组织
c. 创建信道和子进程
d. 通过channel控制子进程
e. 回收管道和子进程
问题1:
解答1:
问题2:
解答2:
f. 将进程池本身和任务文件本身进行解耦
3)完整代码
processpool.cc:
Task.hpp:
Makefile:
命令行中的 | ,就是匿名管道
它们的父进程都是bash
2、进程池
1)理解进程池
a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中
b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!
c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务
d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡
父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)
函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作
2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);return 0;
}
a. 加载任务
我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作
Task.hpp:
#pragma once#include <iostream> #include <ctime> #include <cstdlib> #include <sys/types.h> #include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print() {std::cout << "I am print task" << std::endl; } void DownLoad() {std::cout << "I am a download task" << std::endl; } void Flush() {std::cout << "I am a flush task" << std::endl; }task_t tasks[TaskNum];void LoadTask() {srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush; }void ExcuteTask(int number) {if(number < 0 || number > 2) return;tasks[number](); }int SelectTask() {return rand() % TaskNum; }void work(int rfd) {while (true){int command = 0;int n = read(rfd, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}} }
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务return 0;
}
b. 先描述,再组织
I. 先描述
需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务
class Channel { private:int _wfd;int _subprocessid;std::string _name; };在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)
II. 再组织
std::vector<Channel> channels;我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理
c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0) exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);work();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}
用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数
通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后,关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程,等待父进程回收
d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}
向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程
e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}
我们先将所有的信道关闭,然后在逐个将子进程等待回收
问题1:
那为什么不能边关闭信道边回收呢??
即
解答1:


在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞
在work结束后,才会到下一步close和exit退出子进程;
work结束需要的条件是 n == 0,即读端返回值为0,即
因此上述那种边关闭信道,边wait子进程的方法会阻塞
问题2:
为什么这种方法又能成功回收呢??
解答2:
因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况

f. 将进程池本身和任务文件本身进行解耦
用回调函数可以很好的改善代码的耦合性

通过文件描述符重定向 dup2,将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数
// 重定向
这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
--- 将管道的逻辑和执行任务的逻辑进一步进行解耦
// task_t task : 回调函数
有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel() { close(_wfd); }void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;int _subprocessid;std::string _name;
};// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < num; ++i){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1); // 创建管道失败// 2.创建子进程pid_t id = fork();if (id == 0){// Childclose(pipefd[1]);dup2(pipefd[0], 0);task();close(pipefd[0]);exit(0);}// 3.构建一个名字std::string channel_name = "Channel-" + std::to_string(i);// Fatherclose(pipefd[0]);// 拿到了 a.子进程的pid b.父进程需要的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskCommand)
{write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}void CtrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << "=================================" << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()<< " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){CtrlProcessOnce(channels);}}else{while (true){CtrlProcessOnce(channels);}}
}void ClearUpChannel(std::vector<Channel> &channels)
{// for (auto &channel : channels)// {// channel.CloseChannel();// channel.Wait();// }// int num = channels.size() -1;// while(num >= 0)// {// channels[num].CloseChannel();// channels[num--].Wait();// }for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}// ./processpool 3
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;}int num = std::stoi(argv[1]);LoadTask(); // 加载任务std::vector<Channel> channels; // 将管道组织起来// 1.创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2.通过channel控制子进程CtrlProcess(channels, 10);// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程ClearUpChannel(channels);// // for test// for(auto &channel : channels)// {// std::cout << "====================" << std::endl;// std::cout << channel.GetName() << std::endl;// std::cout << channel.GetWfd() << std::endl;// std::cout << channel.GetProcessId() << std::endl;// }// sleep(100);return 0;
}
Task.hpp:
#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if(number < 0 || number > 2) return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}// void work(int rfd)
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command));
// if (n == sizeof(int))
// {
// std::cout << "pid is: " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process: " << getpid() << " quit" << std::endl;
// break;
// }
// }
// }// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is: " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process: " << getpid() << " quit" << std::endl;break;}}
}
Makefile:
processpool:processpool.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool
相关文章:
Linux - 进程间通信(2)
目录 2、进程池 1)理解进程池 2)进程池的实现 整体框架: a. 加载任务 b. 先描述,再组织 I. 先描述 II. 再组织 c. 创建信道和子进程 d. 通过channel控制子进程 e. 回收管道和子进程 问题1: 解答1ÿ…...
langchain基础(二)
一、输出解析器(Output Parser) 作用:(1)让模型按照指定的格式输出; (2)解析模型输出,提取所需的信息 1、逗号分隔列表 CommaSeparatedListOutputParser:…...
解除阿里云盘压缩包分享限制的最新工具(2025年更新)
前言 前段时间,为了在阿里云盘分享一些资料,尝试了好多种方法:改文件名后缀,打包自解压,使用将压缩文件追加在图片文件后,还有的一些工具,虽然能伪装文件但并不太好用,最后自己写了…...
2025神奇的数字—新年快乐
2025年,一个神奇的数字,承载着数学的奥秘与无限可能。它是45的平方(45),上一个这样的年份是1936年(44),下一个则是2116年(46),一生仅此一次。2025…...
PWM频率测量方法
测量PWM(脉宽调制)信号的频率是嵌入式系统中的常见需求,尤其是在电机控制、LED调光、传感器信号处理等场景中。 在这里介绍两种测量PWM频率的方法:测频法与测周法。 1、测频(率)法 原理:在闸门…...
神经网络|(七)概率论基础知识-贝叶斯公式
【1】引言 前序我们已经了解了一些基础知识。 古典概型:有限个元素参与抽样,每个元素被抽样的概率相等。 条件概率:在某条件已经达成的前提下,新事件发生的概率。实际计算的时候,应注意区分,如果是计算综…...
JavaScript系列(47)--音频处理系统详解
JavaScript音频处理系统详解 🎵 今天,让我们深入探讨JavaScript的音频处理系统。Web Audio API为我们提供了强大的音频处理和合成能力,让我们能够在浏览器中实现复杂的音频应用。 音频系统基础概念 🌟 💡 小知识&…...
【解决方案】VMware虚拟机adb连接宿主机夜神模拟器
1、本机(宿主机,系统windows10)ip为192.168.31.108 2、运行模拟器后本机cmd查看端口为62026 3、VMware虚拟机(系统,kali)adb连接192.168.31.108:62026报错 failed to connect to 192.168.31.108:16416: Co…...
DroneXtract:一款针对无人机的网络安全数字取证工具
关于DroneXtract DroneXtract是一款使用 Golang 开发的适用于DJI无人机的综合数字取证套件,该工具可用于分析无人机传感器值和遥测数据、可视化无人机飞行地图、审计威胁活动以及提取多种文件格式中的相关数据。 功能介绍 DroneXtract 具有四个用于无人机取证和审…...
基于springboot+vue的流浪动物救助系统的设计与实现
开发语言:Java框架:springbootJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包:…...
利用ue5制作CG动画笔记
tips: 按住鼠标中键可以拖动枢轴点 在曲线编辑器中按住shift可以使曲线编辑保持在x轴 专业术语: CGI:计算机生成图象(computer-generated imagery)真实的不算,计算机生成的 Compositing:合…...
AI 图片涌入百度图库
在这个信息爆炸的时代,我们习惯了通过搜索引擎来获取各种想要的信息和图片。然而,现在打开搜索引擎看到的却是许多真假难辨的信息——AI图片,这部分数据正以惊人的速度涌入百度图库,让小编不禁想问:未来打开百度图库不…...
《多阶段渐进式图像修复》学习笔记
paper:2102.02808 GitHub:swz30/MPRNet: [CVPR 2021] Multi-Stage Progressive Image Restoration. SOTA results for Image deblurring, deraining, and denoising. 目录 摘要 1、介绍 2、相关工作 2.1 单阶段方法 2.2 多阶段方法 2.3 注意力机…...
uniapp使用uni.navigateBack返回页面时携带参数到上个页面
我们平时开发中也经常遇到这种场景,跳转一个页面会进行一些操作,操作完成后再返回上个页面同时要携带着一些参数 其实也很简单,也来记录一下吧 假设从A页面 跳转到 B页面 A页面 直接上完整代码了哈,很简单: <t…...
2025.1.26机器学习笔记:C-RNN-GAN文献阅读
2025.1.26周报 文献阅读题目信息摘要Abstract创新点网络架构实验结论缺点以及后续展望 总结 文献阅读 题目信息 题目: C-RNN-GAN: Continuous recurrent neural networks with adversarial training会议期刊: NIPS作者: Olof Mogren发表时间…...
goframe 多语言国际化解决方案
项目背景 本项目采用基于JSON配置的多语言国际化(i18n)解决方案,支持多种语言的无缝切换和本地化。 目录结构 manifest/ └── i18n/├── zh.json # 简体中文├── zh-tw.json # 繁体中文├── en.json # 英语├…...
Deepseek R1 的大模拟考试
本文章同步发布于洛谷专栏。 前情提要:联网,R1。 Summary P4896 OIer们的烦恼:WA 30pts。P1580 yyy loves Easter_Egg I:WA 0pts。P5006 [yLOI2018] 大美江湖:AC。P2830 写程序:WA 33pts。 总 AC 题数&…...
机器人介绍
以下是关于机器人的介绍: 定义 机器人是一种能够自动执行任务的机器系统,它集成了机电、机构学、材料学及仿生学等多个学科技术,可以接受人类指挥,运行预先编排的程序,或根据人工智能技术制定的原则纲领行动…...
设置jmeter界面图标字体大小
设置jmeter界面图标字体大小 方法:点击“选项” -> 点击放大、缩小。(可进行全局的菜单、左侧目录结构树、元件界面显示等字体图标的放大、缩小。)...
JavaScript逆向高阶指南:突破基础,掌握核心逆向技术
JavaScript逆向高阶指南:突破基础,掌握核心逆向技术 JavaScript逆向工程是Web开发者和安全分析师的核心竞争力。无论是解析混淆代码、分析压缩脚本,还是逆向Web应用架构,掌握高阶逆向技术都将助您深入理解复杂JavaScript逻辑。本…...
使用 MSYS2 qemu 尝鲜Arm64架构国产Linux系统
近期,我的师弟咨询我关于Arm64架构的国产CPU国产OS开发工具链问题。他们公司因为接手了一个国企的单子,需要在这类环境下开发程序。说实在的我也没有用过这个平台,但是基于常识,推测只要基于C和Qt,应该问题不大。 1. …...
RocketMQ实战—1.订单系统面临的技术挑战
大纲 1.一个订单系统的整体架构、业务流程及负载情况 2.订单系统面临的技术问题一:下订单的同时还要发券、发红包、Push推送等导致性能太差 3.订单系统面临的技术问题二:订单退款时经常流程失败导致无法完成退款 4.订单系统面临的技术问题三…...
【QT】- QUdpSocket
QUdpSocket 是 Qt 自带的一个类,属于 Qt 网络模块,用于进行 UDP(用户数据报协议) 通信。它提供了简便的接口来发送和接收 UDP 数据报(datagrams)。 UDP 是一种无连接的协议,适用于那些不需要确…...
赚钱的究极认识
1、赚钱的本质是提供了价值或者价值想象 价值: 比如小米手机靠什么?“性价比”,什么饥饿营销,创新,用户参与,生态供应链,品牌这些不能说不重要,但是加在一起都没有“性价比”这3字重…...
Linux学习笔记——用户管理
一、用户管理命令 useradd #用户增加命令 usermod #用户修改命令 passwd #密码修改命令 userdel #用户删除命令 su #用户提权命令 1、useradd命令(加用户): 创建并设置用户信息,使用us…...
【AI】【本地部署】OpenWebUI的升级并移植旧有用户信息
【背景】 OpenWebUI的版本升级频率很高,并会修改旧版本的Bug,不过对于已经在使用的系统,升级后现有用户信息都会丢失,于是研究如何在升级后将现有的用户信息移植到升级后版本。 【准备工作】 OpenWebUI的升级步骤在Docker中有现…...
从synchronized到ReentrantLock_Java锁机制的演进与选择
1 引言 在Java并发编程中,锁机制是确保线程安全的关键。synchronized关键字和显式锁(如ReentrantLock)是两种常用的锁机制。本文将深入探讨这两种锁的工作原理、优缺点,并分析它们在不同场景下的最佳选择,帮助开发者做出明智的选择。 2 synchronized关键字详解 synchro…...
PyCharm接入DeepSeek实现AI编程
目录 效果演示 创建API key 在PyCharm中下载CodeGPT插件 配置Continue DeepSeek 是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的 AI 模型。DeepSeek-V3 是 DeepSeek 公司推出的最新一代 AI 模型。其前身是 DeepSeek-V2.5,经过持续的…...
21款炫酷烟花合集
系列专栏 《Python趣味编程》《C/C趣味编程》《HTML趣味编程》《Java趣味编程》 写在前面 Python、C/C、HTML、Java等4种语言实现18款炫酷烟花的代码。 Python Python烟花① 完整代码:Python动漫烟花(完整代码) Python烟花② 完整…...
数论问题75
命题,证明:存在K∈N,使得对于每个n∈N,Kx2^n1都是合数。 证明:设n2^m,当m0,1,2,3,4时,a(m)2^(2^m)1都是素数。 a(0)213,a(1)2^215,a(2)2^4117&…...










