当前位置: 首页 > news >正文

Flask使用Celery与多进程管理:优雅处理长时间任务与子进程终止技巧(multiprocessing)(subprocess)

在许多任务处理系统中,我们需要使用异步任务队列来处理繁重的计算或长时间运行的任务,如模型训练。Celery是一个广泛使用的分布式任务队列,而在某些任务中,尤其是涉及到调用独立脚本的场景中,我们需要混合使用multiprocessingsubprocess模块来启动和管理这些任务进程。然而,这种组合有时会带来一些挑战,如进程冲突和子进程无法正确终止的问题。

本文将讨论如何使用Celery、Multiprocessing和Subprocess来处理这些问题,并在需要时正确关闭子进程,实现完美的进程管理与切换。

问题描述

当通过celery.control.revoke来终止Celery任务时,如果任务启动了多个子进程,例如使用multiprocessingsubprocess模块,这些子进程不会被立即终止。在某些情况下,子进程会继续运行,导致任务无法彻底停止,并可能造成系统资源浪费。

解决方案

我们可以通过组合使用psutil库来实现对子进程的监控和终止,从而确保所有相关的进程都能正确关闭。以下是具体实现步骤。

安装必要的库

确保你已经安装psutil库:

pip install psutil

修改代码实现

1. Celery任务与Multiprocessing结合Subprocess

首先,我们创建一个Celery任务。当任务启动时,它会使用multiprocessing模块启动一个新的进程,该进程将执行独立的Python脚本。代码如下:

import json
import os
import psutil
import multiprocessing
import subprocess
from celery_app import celeryimport torch.multiprocessing as mpmp.set_start_method('spawn', True)def run_script(json_test_path, uid):command = f"python training.py {json_test_path}"process = subprocess.Popen(command, shell=True)print("===========================================PID:", process.pid)print("===========================================uid:", uid)process.wait()return process.pid@celery.task(bind=True)
def lora_train_task(self, json_test_demo):# 将json_test保存到临时文件中json_test_path = f"training_config_{json_test_demo['uid']}.json"json_test_path = os.path.abspath(json_test_path)with open(json_test_path, 'w') as f:json.dump(json_test_demo, f)# 使用多进程调用独立脚本p = multiprocessing.Process(target=run_script, args=(json_test_path, json_test_demo['uid']))p.start()p.join()return 0
2. 使用psutil关闭子进程

我们通过调用psutil库来监控并关闭所有相关的子进程。以下是实现终止任务和子进程的代码示例:

import psutildef terminate_process_tree(pid):try:parent = psutil.Process(pid)for child in parent.children(recursive=True):  # This will recursively find all child processeschild.terminate()parent.terminate()except psutil.NoSuchProcess:pass# 示例:终止任务时调用终止子进程函数
celery.control.revoke(args['task_id'], terminate=True, signal='SIGKILL')
if args['pid_id']:terminate_process_tree(int(args['pid_id']))

完整示例

将上述代码组合起来,我们得到完成的实现。如下所示:

import json
import os
import psutil
import multiprocessing
import subprocess
from celery_app import celery
import torch.multiprocessing as mpmp.set_start_method('spawn', True)def run_script(json_test_path, uid):command = f"python training.py {json_test_path}"process = subprocess.Popen(command, shell=True)print("===========================================PID:", process.pid)print("===========================================uid:", uid)process.wait()return process.pid@celery.task(bind=True)
def lora_train_task(self, json_test_demo):# 将json_test保存到临时文件中json_test_path = f"training_config_{json_test_demo['uid']}.json"json_test_path = os.path.abspath(json_test_path)with open(json_test_path, 'w') as f:json.dump(json_test_demo, f)# 使用多进程调用独立脚本process = multiprocessing.Process(target=run_script, args=(json_test_path, json_test_demo['uid']))process.start()process.join()return 0def terminate_process_tree(pid):try:parent = psutil.Process(pid)for child in parent.children(recursive=True):  # 递归找到所有子进程child.terminate()parent.terminate()except psutil.NoSuchProcess:pass# 示例:终止任务时调用终止子进程函数
celery.control.revoke(args['task_id'], terminate=True, signal='SIGKILL')
if args['pid_id']:terminate_process_tree(int(args['pid_id']))

结论

本文示范了如何通过混合使用Celery、Multiprocessing与Subprocess来处理复杂的任务执行场景,同时介绍了通过psutil库来正确管理和终止子进程。这种方法能够确保系统资源的合理使用,并避免出现僵尸进程问题。希望本文对你在实际项目中处理类似问题时有所帮助。

相关文章:

Flask使用Celery与多进程管理:优雅处理长时间任务与子进程终止技巧(multiprocessing)(subprocess)

在许多任务处理系统中,我们需要使用异步任务队列来处理繁重的计算或长时间运行的任务,如模型训练。Celery是一个广泛使用的分布式任务队列,而在某些任务中,尤其是涉及到调用独立脚本的场景中,我们需要混合使用multipro…...

Django模板系统

1.常用语法 Django模板中只需要记两种特殊符号: {{ }}和 {% %} {{ }}表示变量,在模板渲染的时候替换成值,{% %}表示逻辑相关的操作。 2.变量 {{ 变量名 }} 变量名由字母数字和下划线组成。 点(.)在模板语言中有…...

15. 文件操作

一、什么是文件 文件(file)通常是磁盘或固态硬盘上的一段已命名的存储区。它是指一组相关数据的有序集合。这个数据集合有一个名称,叫做文件名。文件名 是文件的唯一标识,以便用户识别和引用。文件名包括 3 个部分:文件…...

清风数学建模学习笔记——Topsis法

数模评价类(2)——Topsis法 概述 Topsis:Technique for Order Preference by Similarity to Ideal Solution 也称优劣解距离法,该方法的基本思想是,通过计算每个备选方案与理想解和负理想解之间的距离,从而评估每个…...

组合总和习题分析

习题:(leetcode39) 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 ,并以列表形式返回。你可以按 任意顺序 返回这些组合。 c…...

基于eFramework车控车设中间件介绍

车设的发展,起源于汽车工业萌芽之初,经历了机械式操作的原始粗犷,到电子式调控技术的巨大飞跃,到如今智能化座舱普及,远程车控已然成为汽车标配,车设功能选项也呈现出爆发式增长,渐趋多元繁杂。…...

L17.【LeetCode笔记】另一棵树的子树

目录 1.题目 代码模板 2.分析 3.代码 4.提交结果 1.题目 https://leetcode.cn/problems/subtree-of-another-tree/description/ 给你两棵二叉树 root 和 subRoot 。检验 root 中是否包含和 subRoot 具有相同结构和节点值的子树。如果存在,返回 true &#xff…...

BGP通过route-policy路由策略调用ip-prefix网络前缀实现负载均衡与可靠性之AS-path属性

一、实验场景 1、loopback0与loopback1模拟企业实际环境中的某个网段。 2、本例目标总公司AR3的1.1.1.1/32网段到分公司AR4的3.3.3.3/32的流量从上方的AS500自治系统走。 3、本例目标总公司AR3的4.4.4.4/32网段到分公司AR4的2.2.2.2/32的流量从下面的AS300、AS400自治系统走。…...

每日速记10道java面试题14-MySQL篇

其他资料 每日速记10道java面试题01-CSDN博客 每日速记10道java面试题02-CSDN博客 每日速记10道java面试题03-CSDN博客 每日速记10道java面试题04-CSDN博客 每日速记10道java面试题05-CSDN博客 每日速记10道java面试题06-CSDN博客 每日速记10道java面试题07-CSDN博客 每…...

内存图及其画法

所有的文件都存在硬盘上,首次使用的时候才会进入内存 进程:有自己的Main方法,并且依赖自己Main运行起来的程序。独占一块内存区域,互不干扰。内存中有一个一个的进程。 操作系统只认识c语言。操作系统调度驱动管理硬件&#xff0…...

Ansys Maxwell:Qi 无线充电组件

Qi 无线充电采用感应充电技术,无需物理连接器或电缆,即可将电力从充电站传输到兼容设备。由 WPC 管理的 Qi 标准确保了不同无线充电产品之间的互操作性。以下是 Qi v1.3 标准的核心功能: Qi v1.3 标准的主要特点 身份验证:确保充…...

【Shell 脚本实现 HTTP 请求的接收、解析、处理逻辑】

以下是一个实现客户端对 Shell HTTP 服务发起 POST 请求并传入 JSON 参数的完整示例。Shell 服务会解析收到的 JSON 数据,根据内容执行操作。 服务端脚本:http_server.sh 以下脚本使用 netcat (nc) 来监听 HTTP 请求,并通过 jq 工具解析 JSO…...

【北京迅为】iTOP-4412全能版使用手册-第六十七章 USB鼠标驱动详解

iTOP-4412全能版采用四核Cortex-A9,主频为1.4GHz-1.6GHz,配备S5M8767 电源管理,集成USB HUB,选用高品质板对板连接器稳定可靠,大厂生产,做工精良。接口一应俱全,开发更简单,搭载全网通4G、支持WIFI、蓝牙、…...

【青牛科技】拥有两个独立的、高增益、内部相位补偿的双运算放大器,可适用于单电源或双电源工作——D4558

概述: D4558内部包括有两个独立的、高增益、内部相位补偿的双运算放大器,可适用于单电源或双电源工作。该电路具有电压增益高、噪声低等特点。主要应用于音频信号放大,有源滤波器等场合。 D4558采用DIP8、SOP8的封装形式 主要特点&#xff…...

Kafka 数据写入问题

目录标题 分析思路1. **生产者配置问题**:Kafka生产者的配置参数生产者和消费者的处理确定并优化 2. **网络问题**:3. **Kafka 集群配置问题**:unclean.leader.election.enable 4. **Zookeeper 配置问题**:5. **JVM 参数调优**&am…...

实战ansible-playbook(九)-profile配置- 确保 CUDA 和 MPI 环境变量正确设置并立即生效

Playbook 分析 --- - name: 确保 CUDA 和 MPI 环境变量正确设置并立即生效hosts: pod2 # 指定目标主机组或具体主机名become: yes # 使用特权提升(sudo),以root权限执行某些需要权限的任务remote_user: canopy # 远程连接使用的用户名vars: # 定义全局变量,用于Playbo…...

气膜馆:科技与环保融合的未来建筑新选择—轻空间

在全球城市化进程不断加快的背景下,传统建筑方式面临着越来越多的挑战。如何在有限的土地和资源条件下,快速、高效、环保地搭建符合多功能需求的建筑,成为现代建筑行业亟待解决的重要课题。而随着科技的进步与建筑材料的创新,一种…...

git回退到某个版本git checkout和git reset命令的区别

文章目录 1. git checkout <commit>2. git reset --hard <commit>两者的区别总结推荐使用场景* 在使用 Git 回退到某个版本时&#xff0c; git checkout <commit> 和 git reset --hard <commit> 是两种常见的方式&#xff0c;但它们的用途和影响有很…...

Preprocess

Preprocess数据预处理 文本 使用Tokenizer将文本转换为标记序列&#xff0c;创建标记的数值表示&#xff0c;并将它们组装成张量。 预处理文本数据的主要工具是标记器。标记器根据一组规则将文本拆分为标记。标记被转换为数字&#xff0c;然后转换为张量&#xff0c;这些张量…...

stm32 spi接口传输asm330l速率优化(及cpu和dma方式对比)

最近一段时间做了一个mems的项目&#xff0c;项目的方案是stm32g071做主控&#xff0c;读写3颗asm330l的硬件形态。最初是想放置4颗imu芯片&#xff0c;因为pcb空间布局的问题&#xff0c;改放了3颗。但对于软件方案来说无所谓&#xff0c;关键是如何优化spi的传输速率&#xf…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

前端中slice和splic的区别

1. slice slice 用于从数组中提取一部分元素&#xff0c;返回一个新的数组。 特点&#xff1a; 不修改原数组&#xff1a;slice 不会改变原数组&#xff0c;而是返回一个新的数组。提取数组的部分&#xff1a;slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...

Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?

Pod IP 的本质与特性 Pod IP 的定位 纯端点地址&#xff1a;Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址&#xff08;如 10.244.1.2&#xff09;无特殊名称&#xff1a;在 Kubernetes 中&#xff0c;它通常被称为 “Pod IP” 或 “容器 IP”生命周期&#xff1a;与 Pod …...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

消防一体化安全管控平台:构建消防“一张图”和APP统一管理

在城市的某个角落&#xff0c;一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延&#xff0c;滚滚浓烟弥漫开来&#xff0c;周围群众的生命财产安全受到严重威胁。就在这千钧一发之际&#xff0c;消防救援队伍迅速行动&#xff0c;而豪越科技消防一体化安全管控平台构建的消防“…...