Flask与Celery实现Python调度服务
文章目录
- Flask与Celery实现Python调度服务
- 一、前言
- 1.组件
- 2.场景说明
- 3.环境
- 二、安装依赖
- 1.安装Anaconda
- 3.安装redis
- 2.安装依赖包
- 三、具体实现
- 1.目录结构
- 2.业务流程
- 3.配置文件
- 4.Celery程序
- 5.Flask程序
- 6.测试脚本
- 7.程序启动
- 1)Windows开发调试
- 2)Linux服务器部署
- 3)订阅通道
- 4)测试
Flask与Celery实现Python调度服务
一、前言
1.组件
-
Flask:Flask 是一个轻量级的 Python Web 应用框架。Flask 提供了开发 Web 应用所需的基本功能,并且设计灵活,开发者可以根据需要扩展其功能。由于其核心简洁,它特别适合小型应用和微服务架构。同时,Flask 具有很高的可扩展性,通过各种扩展库可以轻松添加数据库集成、表单处理、身份验证等功能。
-
Celery:Celery 是一个异步任务队列/作业队列,基于分布式消息传递的系统。Celery 用于处理异步任务和调度定时任务,非常适合在后台处理耗时的操作,比如发送邮件、生成报告或与外部 API 交互。它通过任务队列和工作进程分离工作负载,可以提高应用程序的性能和响应速度。
-
uWSGI:uWSGI 是一个应用服务器,用于运行 Python Web 应用。uWSGI 旨在高效地服务 Python 应用,并提供了 WSGI 协议的实现。它通常用于在生产环境中部署 Python 应用。uWSGI 支持多种协议和功能,包括负载均衡、进程管理等。它与 Web 服务器(如 Nginx 或 Apache)配合使用,以提高应用的性能和可靠性。
-
Redis:Redis 是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。Redis 通过键值对的方式存储数据,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。它具有极高的读写性能,常用于缓存数据库查询结果、会话存储、队列管理等场景,以减少数据库的压力并提高系统的响应速度。Redis 还支持持久化,可以在重启后恢复数据。
2.场景说明
- 解决在 java 项目中执行 python 脚本的问题;之前实现的方法是通过 Java 程序模拟命令行的交互环境执行 python 脚本,但是无法控制执行脚本的并发,导致服务器硬件资源占用过高的情况时常发生。
3.环境
- Windows 版本(开发):Windows 10 专业版
- Linux 发行版(部署):CentOS-7-x86_64-DVD-1804.iso
- Postman for Windows Version:11.3.2
- flask:2.2.5
- celery:5.4.0
- redis 客户端:5.0.4
- redis 服务端:7.0.12
- uwsgi:2.0.21
Postman 下载:https://www.postman.com/downloads/
flask框架入门和使用实践:https://blog.csdn.net/u011424614/article/details/112548442
Java执行Python脚本:https://blog.csdn.net/u011424614/article/details/114199102
[Windows] Anacoda安装和使用:https://blog.csdn.net/u011424614/article/details/105579502CentOS7安装部署Anaconda:https://blog.csdn.net/u011424614/article/details/140253920
CentOS7安装部署Redis7:https://blog.csdn.net/u011424614/article/details/132418619
二、安装依赖
1.安装Anaconda
- 安装参考:《CentOS7安装部署Anaconda》
- 安装目录:
/opt/anaconda/install
3.安装redis
- 安装参考:《CentOS7安装部署Redis7 》
- 安装目录:
/opt/redis/redis-7.0.12/src
2.安装依赖包
# 创建环境
conda create -n dev01 python==3.10.14
# 激活环境
conda activate dev01# 安装依赖包(指定国内镜像源)
pip install flask==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install celery==5.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install redis==5.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install werkzeug==2.3.8 -i https://pypi.tuna.tsinghua.edu.cn/simple
# Windows 开发环境不需要安装,Linux 部署时使用
conda install uwsgi==2.0.21 -i https://pypi.tuna.tsinghua.edu.cn/simple
三、具体实现
1.目录结构
/opt/scheduler-service
├── logs
├── config
│ ├── redis_config.ini
│ └── uwsgi.ini
├── test
│ ├── my_script.py
├── celery_service.py
├── celery_worker_start.py
├── flask_service.py
└── flask_service_start.py
2.业务流程
执行方式:异步执行
- Postman 发送请求后,调用 flask 程序
- flask 将根据接口参数,通过 celery 异步执行指定脚本,并传递脚本所需参数;celery 异步执行后,获取 celery 任务的 task_id;最后通过 flask 将 task_id 返回给 Postman
- celery 执行脚本成功或失败,都会将执行结果(包含 task_id)发布到 Redis 的指定通道
- Redis 客户端提前订阅通道,监听通道的消息(通过 task_id 匹配执行任务和结果)
3.配置文件
- redis_config.ini
[celery]
# 配置 Celery 的消息代理 URL
broker_url = redis://127.0.0.1:6379/2# 配置 Celery 的结果后端 URL
backend_url = redis://127.0.0.1:6379/3[redis]
# 配置 Redis 的主机地址
host = 127.0.0.1# 配置 Redis 的端口
port = 6379# 配置 Redis 的数据库索引
db = 4# 配置 Redis 的密码,如果没有密码留空
password =[channel]
# 配置消息发布的频道名称
name = schedulerChannel
- uwsgi.ini【Windows 开发环境不需要,Linux部署时使用】
[uwsgi]
# 使用 HTTP 协议,监听 6000 端口
http = :6000# 设置 uWSGI 的工作目录为 Flask 应用所在目录
chdir = ./# 指定 Flask 应用所在的 Python 文件和变量名
module = flask_app:app# 启用主进程
master = true# 启动的工作进程数,推荐设置为 CPU 核心数的 2-4 倍
processes = 4# 线程数,可以根据需要设置
threads = 2# 指定静态文件的目录,如果有静态文件服务的需求
# static-map = /static=/path/to/static/files# 设置 Python 的自动加载,使得每次修改代码后无需重启服务
py-autoreload = 1# 设置日志文件路径
daemonize = /opt/scheduler-service/logs/uwsgi.log
4.Celery程序
- celery_app.py
from celery import Celery
from celery.signals import task_success, task_failure
import importlib.machinery
import redis
from dataclasses import dataclass, asdict
import json
import configparser# 读取配置文件
config = configparser.ConfigParser()
config.read('config/redis_config.ini', encoding='utf-8')# 初始化 Celery 应用
# 设置消息代理(Broker)和结果后端(Backend)
app = Celery('tasks',broker=config['celery']['broker_url'],backend=config['celery']['backend_url']
)# 创建 Redis 连接
client = redis.Redis(host=config['redis']['host'],port=config.getint('redis', 'port'),db=config.getint('redis', 'db'),password=config['redis']['password'])# 频道名称
channel_name = config['channel']['name']# 定义消息数据模型
@dataclass
class Message:success: boolmsg: strtaskId: strresult: str# 任务成功完成时触发的事件
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):print(f"任务 {sender.request.id} 执行成功.")# 构建消息message = Message(success=True, msg='', taskId=sender.request.id, result=result)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 任务失败时触发的事件
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):print(f"任务 {task_id} 执行失败,异常: {exception}")# 构建消息message = Message(success=False, msg=str(exception), taskId=task_id, result=None)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 定义 Celery 任务
@app.task
def execute_script(script_path, *args, **kwargs):try:# 使用 SourceFileLoader 加载模块loader = importlib.machinery.SourceFileLoader("module.name", script_path)module = loader.load_module()# 执行模块中的 main 函数result = module.main(*args, **kwargs)return resultexcept Exception as e:print(f"执行过程异常: {e}", exc_info=True)raise
- celery_worker_start.py
from celery_app import appif __name__ == '__main__':# 在 Windows 环境下运行 Celery Worker# 使用 solo 模式以避免 Windows 下多进程问题app.worker_main(argv=['worker', '--loglevel=info', '--pool=solo'])# 在 Linux 环境下运行 Celery Worker# 使用 8 个并发工作进程# app.worker_main(argv=['worker', '--loglevel=info', '--concurrency=8'])
5.Flask程序
- flask_app.py
from flask import Flask, request
from celery_app import execute_script
from dataclasses import dataclass, asdict
import json# 初始化 Flask 应用实例
app = Flask(__name__)# 定义数据模型
@dataclass
class Message:success: boolmsg: strtaskId: str# 定义执行脚本的接口
@app.route('/execute_script', methods=['POST'])
def run_script():# 获取请求数据data = request.get_json()# 异步执行脚本并获取任务 IDtask = execute_script.delay(data['script_path'], *data['args'])task_id = task.idprint(f"flask_app > task_id: {task_id}")try:# 创建成功消息message = Message(success=True, msg='', taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 200except Exception as e:# 捕获并处理异常,返回错误消息message = Message(success=False, msg=str(e), taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 500
- flask_app_start.py
from flask_app import app# 运行 Flask 应用
if __name__ == '__main__':# 以调试模式启动 Flask 应用# 设置 host 为 '0.0.0.0' 以允许外部访问# 端口号为 6000# 使用 reloader 以便在代码更改时自动重启服务器app.run(debug=True, host='0.0.0.0', port=6000, use_reloader=True)
6.测试脚本
- test_script.py
import time# main 方法作为入口
def main(x, y):# 阻塞 5 秒time.sleep(5)# 计算结果result = x * y# 打印乘法计算结果print(f"乘法计算: {x} X {y} = {result}")# 返回计算结果return result
7.程序启动
1)Windows开发调试
- 运行
flask_app_start.py
和celery_worker_start.py
2)Linux服务器部署
- 未安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
nohup python flask_app_start.py > logs/flask.$(date +%Y-%m-%d).log 2>&1 &
- 已安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
uwsgi --ini config/uwsgi.ini
3)订阅通道
- Redis 客户端订阅通道
redis-cli subscribe schedulerChannel
4)测试
- 使用 Springboot 等可以发送请求的程序进行测试,不限编程语言
- 使用 Postman for Windows 发送请求进行测试(当前文章使用的方法)
Postman配置:
- POST 请求:
http://192.168.28.179:6000/execute_script
- Headers 配置(配置 Key 和 Value ):
Content-Type
:application/json
Accept
:application/json
- Body - raw 配置(测试脚本和参数):
{"script_path": "E:\\scheduler-service\\test\\test_script.py","args": [1, 5]
}
相关文章:
Flask与Celery实现Python调度服务
文章目录 Flask与Celery实现Python调度服务一、前言1.组件2.场景说明3.环境 二、安装依赖1.安装Anaconda3.安装redis2.安装依赖包 三、具体实现1.目录结构2.业务流程3.配置文件4.Celery程序5.Flask程序6.测试脚本7.程序启动1)Windows开发调试2)Linux服务…...
Eureka应用场景和优势
Eureka是一款由Netflix开源的服务注册与发现框架,在微服务架构中扮演着至关重要的角色。以下是Eureka的应用场景和优势: Eureka的应用场景 Eureka主要应用于微服务架构中,特别是在大型、复杂的分布式系统中,用于管理和发现服务。…...
prompt第三讲-PromptTemplate
文章目录 前提回顾PromptTemplateprompt 模板定义以f-string渲染格式以mustache渲染格式以jinja2渲染格式直接实例化PromptTemplatePromptTemplate核心变量 prompt value生成invokeformat_prompt(不建议使用)format(不建议使用) batchstreamainvoke PromptTemplate核心方法part…...

卷积神经网络图像识别车辆类型
卷积神经网络图像识别车辆类型 1、图像 自行车: 汽车: 摩托车: 2、数据集目录 3、流程 1、获取数据,把图像转成矩阵,并随机划分训练集、测试集 2、把标签转为数值,将标签向量转换为二值矩阵 3、图像数据归一化,0-1之间的值 4、构造卷积神经网络 5、设置图像输入…...

【接口设计】用 Swagger 实现接口文档
用 Swagger 实现接口文档 1.配置 Swagger1.1 添加 Swagger 依赖1.2 创建 Swagger 配置类 2.编写接口文档 在项目开发中,一般都是由前后端工程师共同定义接口,编写接口文档,之后大家根据这个接口文档进行开发、维护。为了便于编写和维护稳定&a…...

TensorFlow系列:第四讲:MobileNetV2实战
一. 加载数据集 编写工具类,实现数据集的加载 import keras""" 加载数据集工具类 """class DatasetLoader:def __init__(self, path_url, image_size(224, 224), batch_size32, class_modecategorical):self.path_url path_urlself…...

Redis+Caffeine 实现两级缓存实战
RedisCaffeine 实现两级缓存 背景 事情的开始是这样的,前段时间接了个需求,给公司的商城官网提供一个查询预计送达时间的接口。接口很简单,根据请求传的城市仓库发货时间查询快递的预计送达时间。因为商城下单就会调用这个接口ÿ…...

SpringBoot:SpringBoot中如何实现对Http接口进行监控
一、前言 Spring Boot Actuator是Spring Boot提供的一个模块,用于监控和管理Spring Boot应用程序的运行时信息。它提供了一组监控端点(endpoints),用于获取应用程序的健康状态、性能指标、配置信息等,并支持通过 HTTP …...

STM32-I2C硬件外设
本博文建议与我上一篇I2C 通信协议共同理解 合成一套关于I2C软硬件体系 STM32内部集成了硬件I2C收发电路,可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能,减轻CPU的负担 特点: 多主机功能&#x…...

暑假第一次作业
第一步:给R1,R2,R3,R4配IP [R1-GigabitEthernet0/0/0]ip address 192.168.1.1 24 [R1-Serial4/0/0]ip address 15.0.0.1 24 [R2-GigabitEthernet0/0/0]ip address 192.168.2.1 24 [R2-Serial4/0/0]ip address 25.0.0.1 24 [R3-GigabitEthernet0/0/0]ip address 192.…...

【算法专题】快速排序
1. 颜色分类 75. 颜色分类 - 力扣(LeetCode) 依据题意,我们需要把只包含0、1、2的数组划分为三个部分,事实上,在我们前面学习过的【算法专题】双指针算法-CSDN博客中,有一道题叫做移动零,题目要…...

debian 12 PXE Server 批量部署系统
pxe server 前言 PXE(Preboot eXecution Environment,预启动执行环境)是一种网络启动协议,允许计算机通过网络启动而不是使用本地硬盘。PXE服务器是实现这一功能的服务器,它提供了启动镜像和引导加载程序,…...

【Pytorch】RNN for Image Classification
文章目录 1 RNN 的定义2 RNN 输入 input, h_03 RNN 输出 output, h_n4 多层5 小试牛刀 学习参考来自 pytorch中nn.RNN()总结RNN for Image Classification(RNN图片分类–MNIST数据集)pytorch使用-nn.RNNBuilding RNNs is Fun with PyTorch and Google Colab 1 RNN 的定义 nn.…...

基于Java的飞机大战游戏的设计与实现论文
点击下载源码 基于Java的飞机大战游戏的设计与实现 摘 要 现如今,随着智能手机的兴起与普及,加上4G(the 4th Generation mobile communication ,第四代移动通信技术)网络的深入,越来越多的IT行业开始向手机…...

初识影刀:EXCEL根据部门筛选低值易耗品
第一次知道这个办公自动化的软件还是在招聘网站上,了解之后发现对于办公中重复性的工作还是挺有帮助的,特别是那些操作非EXCEL的重复性工作,当然用在EXCEL上更加方便,有些操作比写VBA便捷。 下面就是一个了解基本操作后ÿ…...

nginx的四层负载均衡实战
目录 1 环境准备 1.1 mysql 部署 1.2 nginx 部署 1.3 关闭防火墙和selinux 2 nginx配置 2.1 修改nginx主配置文件 2.2 创建stream配置文件 2.3 重启nginx 3 测试四层代理是否轮循成功 3.1 远程链接通过代理服务器访问 3.2 动图演示 4 四层反向代理算法介绍 4.1 轮询࿰…...

中职网络安全B模块Cenots6.8数据库
任务环境说明: ✓ 服务器场景:CentOS6.8(开放链接) ✓ 用户名:root;密码:123456 进入虚拟机操作系统:CentOS 6.8,登陆数据库(用户名:root&#x…...
BGP笔记的基本概要
技术背景: 在只有IGP(诸如OSPF、IS-IS、RIP等协议,因为最初是被设计在一个单域中进行一个路由操纵,因此被统一称为Interior Gateway Protocol,内部网关协议)的时代,域间路由无法实现一个全局路由…...

【Redis】复制(Replica)
文章目录 一、复制是什么?二、 基本命令三、 配置(分为配置文件和命令配置)3.1 配置文件3.2 命令配置3.3 嵌套连接3.4 关闭从属关系 四、 复制原理五、 缺点 以下是本篇文章正文内容 一、复制是什么? 主从复制 masterÿ…...

封装了一个仿照抖音效果的iOS评论弹窗
需求背景 开发一个类似抖音评论弹窗交互效果的弹窗,支持滑动消失, 滑动查看评论 效果如下图 思路 创建一个视图,该视图上面放置一个tableView, 该视图上添加一个滑动手势,同时设置代理,实现代理方法 (BOOL)gestur…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...

基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...