当前位置: 首页 > news >正文

Flask与Celery实现Python调度服务

文章目录

  • Flask与Celery实现Python调度服务
  • 一、前言
    • 1.组件
    • 2.场景说明
    • 3.环境
  • 二、安装依赖
    • 1.安装Anaconda
    • 3.安装redis
    • 2.安装依赖包
  • 三、具体实现
    • 1.目录结构
    • 2.业务流程
    • 3.配置文件
    • 4.Celery程序
    • 5.Flask程序
    • 6.测试脚本
    • 7.程序启动
      • 1)Windows开发调试
      • 2)Linux服务器部署
      • 3)订阅通道
      • 4)测试

Flask与Celery实现Python调度服务

一、前言

1.组件

  • Flask:Flask 是一个轻量级的 Python Web 应用框架。Flask 提供了开发 Web 应用所需的基本功能,并且设计灵活,开发者可以根据需要扩展其功能。由于其核心简洁,它特别适合小型应用和微服务架构。同时,Flask 具有很高的可扩展性,通过各种扩展库可以轻松添加数据库集成、表单处理、身份验证等功能。

  • Celery:Celery 是一个异步任务队列/作业队列,基于分布式消息传递的系统。Celery 用于处理异步任务和调度定时任务,非常适合在后台处理耗时的操作,比如发送邮件、生成报告或与外部 API 交互。它通过任务队列和工作进程分离工作负载,可以提高应用程序的性能和响应速度。

  • uWSGI:uWSGI 是一个应用服务器,用于运行 Python Web 应用。uWSGI 旨在高效地服务 Python 应用,并提供了 WSGI 协议的实现。它通常用于在生产环境中部署 Python 应用。uWSGI 支持多种协议和功能,包括负载均衡、进程管理等。它与 Web 服务器(如 Nginx 或 Apache)配合使用,以提高应用的性能和可靠性。

  • Redis:Redis 是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。Redis 通过键值对的方式存储数据,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。它具有极高的读写性能,常用于缓存数据库查询结果、会话存储、队列管理等场景,以减少数据库的压力并提高系统的响应速度。Redis 还支持持久化,可以在重启后恢复数据。

2.场景说明

  • 解决在 java 项目中执行 python 脚本的问题;之前实现的方法是通过 Java 程序模拟命令行的交互环境执行 python 脚本,但是无法控制执行脚本的并发,导致服务器硬件资源占用过高的情况时常发生。

3.环境

  • Windows 版本(开发):Windows 10 专业版
  • Linux 发行版(部署):CentOS-7-x86_64-DVD-1804.iso
  • Postman for Windows Version:11.3.2
  • flask:2.2.5
  • celery:5.4.0
  • redis 客户端:5.0.4
  • redis 服务端:7.0.12
  • uwsgi:2.0.21

Postman 下载:https://www.postman.com/downloads/

flask框架入门和使用实践:https://blog.csdn.net/u011424614/article/details/112548442

Java执行Python脚本:https://blog.csdn.net/u011424614/article/details/114199102
[Windows] Anacoda安装和使用:https://blog.csdn.net/u011424614/article/details/105579502

CentOS7安装部署Anaconda:https://blog.csdn.net/u011424614/article/details/140253920

CentOS7安装部署Redis7:https://blog.csdn.net/u011424614/article/details/132418619

二、安装依赖

1.安装Anaconda

  • 安装参考:《CentOS7安装部署Anaconda》
  • 安装目录:/opt/anaconda/install

3.安装redis

  • 安装参考:《CentOS7安装部署Redis7 》
  • 安装目录:/opt/redis/redis-7.0.12/src

2.安装依赖包

# 创建环境
conda create -n dev01 python==3.10.14
# 激活环境
conda activate dev01# 安装依赖包(指定国内镜像源)
pip install flask==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install celery==5.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install redis==5.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install werkzeug==2.3.8 -i https://pypi.tuna.tsinghua.edu.cn/simple
# Windows 开发环境不需要安装,Linux 部署时使用
conda install uwsgi==2.0.21 -i https://pypi.tuna.tsinghua.edu.cn/simple

三、具体实现

1.目录结构

/opt/scheduler-service
├── logs
├── config
│   ├── redis_config.ini
│   └── uwsgi.ini
├── test
│   ├── my_script.py
├── celery_service.py
├── celery_worker_start.py
├── flask_service.py
└── flask_service_start.py

2.业务流程

执行方式:异步执行

  1. Postman 发送请求后,调用 flask 程序
  2. flask 将根据接口参数,通过 celery 异步执行指定脚本,并传递脚本所需参数;celery 异步执行后,获取 celery 任务的 task_id;最后通过 flask 将 task_id 返回给 Postman
  3. celery 执行脚本成功或失败,都会将执行结果(包含 task_id)发布到 Redis 的指定通道
  4. Redis 客户端提前订阅通道,监听通道的消息(通过 task_id 匹配执行任务和结果)

3.配置文件

  • redis_config.ini
[celery]
# 配置 Celery 的消息代理 URL
broker_url = redis://127.0.0.1:6379/2# 配置 Celery 的结果后端 URL
backend_url = redis://127.0.0.1:6379/3[redis]
# 配置 Redis 的主机地址
host = 127.0.0.1# 配置 Redis 的端口
port = 6379# 配置 Redis 的数据库索引
db = 4# 配置 Redis 的密码,如果没有密码留空
password =[channel]
# 配置消息发布的频道名称
name = schedulerChannel
  • uwsgi.ini【Windows 开发环境不需要,Linux部署时使用】
[uwsgi]
# 使用 HTTP 协议,监听 6000 端口
http = :6000# 设置 uWSGI 的工作目录为 Flask 应用所在目录
chdir = ./# 指定 Flask 应用所在的 Python 文件和变量名
module = flask_app:app# 启用主进程
master = true# 启动的工作进程数,推荐设置为 CPU 核心数的 2-4 倍
processes = 4# 线程数,可以根据需要设置
threads = 2# 指定静态文件的目录,如果有静态文件服务的需求
# static-map = /static=/path/to/static/files# 设置 Python 的自动加载,使得每次修改代码后无需重启服务
py-autoreload = 1# 设置日志文件路径
daemonize = /opt/scheduler-service/logs/uwsgi.log

4.Celery程序

  • celery_app.py
from celery import Celery
from celery.signals import task_success, task_failure
import importlib.machinery
import redis
from dataclasses import dataclass, asdict
import json
import configparser# 读取配置文件
config = configparser.ConfigParser()
config.read('config/redis_config.ini', encoding='utf-8')# 初始化 Celery 应用
# 设置消息代理(Broker)和结果后端(Backend)
app = Celery('tasks',broker=config['celery']['broker_url'],backend=config['celery']['backend_url']
)# 创建 Redis 连接
client = redis.Redis(host=config['redis']['host'],port=config.getint('redis', 'port'),db=config.getint('redis', 'db'),password=config['redis']['password'])# 频道名称
channel_name = config['channel']['name']# 定义消息数据模型
@dataclass
class Message:success: boolmsg: strtaskId: strresult: str# 任务成功完成时触发的事件
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):print(f"任务 {sender.request.id} 执行成功.")# 构建消息message = Message(success=True, msg='', taskId=sender.request.id, result=result)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 任务失败时触发的事件
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):print(f"任务 {task_id} 执行失败,异常: {exception}")# 构建消息message = Message(success=False, msg=str(exception), taskId=task_id, result=None)# 发布消息到频道msg_json = json.dumps(asdict(message), ensure_ascii=False)client.publish(channel_name, msg_json)# 定义 Celery 任务
@app.task
def execute_script(script_path, *args, **kwargs):try:# 使用 SourceFileLoader 加载模块loader = importlib.machinery.SourceFileLoader("module.name", script_path)module = loader.load_module()# 执行模块中的 main 函数result = module.main(*args, **kwargs)return resultexcept Exception as e:print(f"执行过程异常: {e}", exc_info=True)raise
  • celery_worker_start.py
from celery_app import appif __name__ == '__main__':# 在 Windows 环境下运行 Celery Worker# 使用 solo 模式以避免 Windows 下多进程问题app.worker_main(argv=['worker', '--loglevel=info', '--pool=solo'])# 在 Linux 环境下运行 Celery Worker# 使用 8 个并发工作进程# app.worker_main(argv=['worker', '--loglevel=info', '--concurrency=8'])

5.Flask程序

  • flask_app.py
from flask import Flask, request
from celery_app import execute_script
from dataclasses import dataclass, asdict
import json# 初始化 Flask 应用实例
app = Flask(__name__)# 定义数据模型
@dataclass
class Message:success: boolmsg: strtaskId: str# 定义执行脚本的接口
@app.route('/execute_script', methods=['POST'])
def run_script():# 获取请求数据data = request.get_json()# 异步执行脚本并获取任务 IDtask = execute_script.delay(data['script_path'], *data['args'])task_id = task.idprint(f"flask_app > task_id: {task_id}")try:# 创建成功消息message = Message(success=True, msg='', taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 200except Exception as e:# 捕获并处理异常,返回错误消息message = Message(success=False, msg=str(e), taskId=task_id)msg_json = json.dumps(asdict(message), ensure_ascii=False)return msg_json, 500
  • flask_app_start.py
from flask_app import app# 运行 Flask 应用
if __name__ == '__main__':# 以调试模式启动 Flask 应用# 设置 host 为 '0.0.0.0' 以允许外部访问# 端口号为 6000# 使用 reloader 以便在代码更改时自动重启服务器app.run(debug=True, host='0.0.0.0', port=6000, use_reloader=True)

6.测试脚本

  • test_script.py
import time# main 方法作为入口
def main(x, y):# 阻塞 5 秒time.sleep(5)# 计算结果result = x * y# 打印乘法计算结果print(f"乘法计算: {x} X {y} = {result}")# 返回计算结果return result

7.程序启动

1)Windows开发调试

  • 运行 flask_app_start.pycelery_worker_start.py

2)Linux服务器部署

  • 未安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
nohup python flask_app_start.py > logs/flask.$(date +%Y-%m-%d).log 2>&1 &
  • 已安装 uwsgi
nohup python celery_worker_start.py > logs/celery.$(date +%Y-%m-%d).log 2>&1 &
uwsgi --ini config/uwsgi.ini

3)订阅通道

  • Redis 客户端订阅通道
redis-cli subscribe schedulerChannel

4)测试

  1. 使用 Springboot 等可以发送请求的程序进行测试,不限编程语言
  2. 使用 Postman for Windows 发送请求进行测试(当前文章使用的方法

Postman配置:

  1. POST 请求:http://192.168.28.179:6000/execute_script
  2. Headers 配置(配置 Key 和 Value ):
  • Content-Type : application/json
  • Accept : application/json
  1. Body - raw 配置(测试脚本和参数):
{"script_path": "E:\\scheduler-service\\test\\test_script.py","args": [1, 5]
}

相关文章:

Flask与Celery实现Python调度服务

文章目录 Flask与Celery实现Python调度服务一、前言1.组件2.场景说明3.环境 二、安装依赖1.安装Anaconda3.安装redis2.安装依赖包 三、具体实现1.目录结构2.业务流程3.配置文件4.Celery程序5.Flask程序6.测试脚本7.程序启动1)Windows开发调试2)Linux服务…...

Eureka应用场景和优势

Eureka是一款由Netflix开源的服务注册与发现框架,在微服务架构中扮演着至关重要的角色。以下是Eureka的应用场景和优势: Eureka的应用场景 Eureka主要应用于微服务架构中,特别是在大型、复杂的分布式系统中,用于管理和发现服务。…...

prompt第三讲-PromptTemplate

文章目录 前提回顾PromptTemplateprompt 模板定义以f-string渲染格式以mustache渲染格式以jinja2渲染格式直接实例化PromptTemplatePromptTemplate核心变量 prompt value生成invokeformat_prompt(不建议使用)format(不建议使用) batchstreamainvoke PromptTemplate核心方法part…...

卷积神经网络图像识别车辆类型

卷积神经网络图像识别车辆类型 1、图像 自行车: 汽车: 摩托车: 2、数据集目录 3、流程 1、获取数据,把图像转成矩阵,并随机划分训练集、测试集 2、把标签转为数值,将标签向量转换为二值矩阵 3、图像数据归一化,0-1之间的值 4、构造卷积神经网络 5、设置图像输入…...

【接口设计】用 Swagger 实现接口文档

用 Swagger 实现接口文档 1.配置 Swagger1.1 添加 Swagger 依赖1.2 创建 Swagger 配置类 2.编写接口文档 在项目开发中,一般都是由前后端工程师共同定义接口,编写接口文档,之后大家根据这个接口文档进行开发、维护。为了便于编写和维护稳定&a…...

TensorFlow系列:第四讲:MobileNetV2实战

一. 加载数据集 编写工具类,实现数据集的加载 import keras""" 加载数据集工具类 """class DatasetLoader:def __init__(self, path_url, image_size(224, 224), batch_size32, class_modecategorical):self.path_url path_urlself…...

Redis+Caffeine 实现两级缓存实战

RedisCaffeine 实现两级缓存 背景 ​ 事情的开始是这样的,前段时间接了个需求,给公司的商城官网提供一个查询预计送达时间的接口。接口很简单,根据请求传的城市仓库发货时间查询快递的预计送达时间。因为商城下单就会调用这个接口&#xff…...

SpringBoot:SpringBoot中如何实现对Http接口进行监控

一、前言 Spring Boot Actuator是Spring Boot提供的一个模块,用于监控和管理Spring Boot应用程序的运行时信息。它提供了一组监控端点(endpoints),用于获取应用程序的健康状态、性能指标、配置信息等,并支持通过 HTTP …...

STM32-I2C硬件外设

本博文建议与我上一篇I2C 通信协议​​​​​​共同理解 合成一套关于I2C软硬件体系 STM32内部集成了硬件I2C收发电路,可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能,减轻CPU的负担 特点: 多主机功能&#x…...

暑假第一次作业

第一步:给R1,R2,R3,R4配IP [R1-GigabitEthernet0/0/0]ip address 192.168.1.1 24 [R1-Serial4/0/0]ip address 15.0.0.1 24 [R2-GigabitEthernet0/0/0]ip address 192.168.2.1 24 [R2-Serial4/0/0]ip address 25.0.0.1 24 [R3-GigabitEthernet0/0/0]ip address 192.…...

【算法专题】快速排序

1. 颜色分类 75. 颜色分类 - 力扣(LeetCode) 依据题意,我们需要把只包含0、1、2的数组划分为三个部分,事实上,在我们前面学习过的【算法专题】双指针算法-CSDN博客中,有一道题叫做移动零,题目要…...

debian 12 PXE Server 批量部署系统

pxe server 前言 PXE(Preboot eXecution Environment,预启动执行环境)是一种网络启动协议,允许计算机通过网络启动而不是使用本地硬盘。PXE服务器是实现这一功能的服务器,它提供了启动镜像和引导加载程序,…...

【Pytorch】RNN for Image Classification

文章目录 1 RNN 的定义2 RNN 输入 input, h_03 RNN 输出 output, h_n4 多层5 小试牛刀 学习参考来自 pytorch中nn.RNN()总结RNN for Image Classification(RNN图片分类–MNIST数据集)pytorch使用-nn.RNNBuilding RNNs is Fun with PyTorch and Google Colab 1 RNN 的定义 nn.…...

基于Java的飞机大战游戏的设计与实现论文

点击下载源码 基于Java的飞机大战游戏的设计与实现 摘 要 现如今,随着智能手机的兴起与普及,加上4G(the 4th Generation mobile communication ,第四代移动通信技术)网络的深入,越来越多的IT行业开始向手机…...

初识影刀:EXCEL根据部门筛选低值易耗品

第一次知道这个办公自动化的软件还是在招聘网站上,了解之后发现对于办公中重复性的工作还是挺有帮助的,特别是那些操作非EXCEL的重复性工作,当然用在EXCEL上更加方便,有些操作比写VBA便捷。 下面就是一个了解基本操作后&#xff…...

nginx的四层负载均衡实战

目录 1 环境准备 1.1 mysql 部署 1.2 nginx 部署 1.3 关闭防火墙和selinux 2 nginx配置 2.1 修改nginx主配置文件 2.2 创建stream配置文件 2.3 重启nginx 3 测试四层代理是否轮循成功 3.1 远程链接通过代理服务器访问 3.2 动图演示 4 四层反向代理算法介绍 4.1 轮询&#xff0…...

中职网络安全B模块Cenots6.8数据库

任务环境说明: ✓ 服务器场景:CentOS6.8(开放链接) ✓ 用户名:root;密码:123456 进入虚拟机操作系统:CentOS 6.8,登陆数据库(用户名:root&#x…...

BGP笔记的基本概要

技术背景: 在只有IGP(诸如OSPF、IS-IS、RIP等协议,因为最初是被设计在一个单域中进行一个路由操纵,因此被统一称为Interior Gateway Protocol,内部网关协议)的时代,域间路由无法实现一个全局路由…...

【Redis】复制(Replica)

文章目录 一、复制是什么?二、 基本命令三、 配置(分为配置文件和命令配置)3.1 配置文件3.2 命令配置3.3 嵌套连接3.4 关闭从属关系 四、 复制原理五、 缺点 以下是本篇文章正文内容 一、复制是什么? 主从复制 master&#xff…...

封装了一个仿照抖音效果的iOS评论弹窗

需求背景 开发一个类似抖音评论弹窗交互效果的弹窗,支持滑动消失, 滑动查看评论 效果如下图 思路 创建一个视图,该视图上面放置一个tableView, 该视图上添加一个滑动手势,同时设置代理,实现代理方法 (BOOL)gestur…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如&#xff1a…...

MySQL的pymysql操作

本章是MySQL的最后一章,MySQL到此完结,下一站Hadoop!!! 这章很简单,完整代码在最后,详细讲解之前python课程里面也有,感兴趣的可以往前找一下 一、查询操作 我们需要打开pycharm …...

【Linux】Linux安装并配置RabbitMQ

目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...

2025年低延迟业务DDoS防护全攻略:高可用架构与实战方案

一、延迟敏感行业面临的DDoS攻击新挑战 2025年,金融交易、实时竞技游戏、工业物联网等低延迟业务成为DDoS攻击的首要目标。攻击呈现三大特征: AI驱动的自适应攻击:攻击流量模拟真实用户行为,差异率低至0.5%,传统规则引…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞,需构建多层防御体系,结合技术验证、存储隔离与权限控制: 🔒 一、基础防护层 前端校验(仅辅助) 通过JavaScript限制文件后缀名(白名单)和大小,提…...

高效的后台管理系统——可进行二次开发

随着互联网技术的迅猛发展,企业的数字化管理变得愈加重要。后台管理系统作为数据存储与业务管理的核心,成为了现代企业不可或缺的一部分。今天我们要介绍的是一款名为 若依后台管理框架 的系统,它不仅支持跨平台应用,还能提供丰富…...