构建高性能异步任务引擎:FastAPI + Celery + Redis
在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPI、Celery 和 Redis 构建一个高性能的异步任务引擎。
项目背景
技术栈介绍
- FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
- Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
- Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。
项目目标
通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。
项目目录结构
project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py
代码功能深度解析
1. main.py
:FastAPI 应用的核心
main.py
是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。
FastAPI 应用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")
这里我们创建了一个 FastAPI 应用,命名为 Async Task API
,版本为 1.0.0
。
自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch
通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。
全局异常处理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})
我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。
HTTP 中间件:计算请求处理时间
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response
这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time
中,方便调试和性能优化。
创建任务的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})
用户可以通过 /tasks
接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id}
接口查询。
查询任务结果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)
用户可以通过 /tasks/{task_id}
接口查询任务的执行结果和状态。
2. utils.py
:任务信息获取工具
utils.py
文件定义了一个工具函数 get_task_info
,用于获取 Celery 任务的状态和结果。
def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result
通过 AsyncResult
,我们可以获取任务的当前状态(如 PENDING
、SUCCESS
、FAILURE
等)和执行结果。
3. schemas.py
:数据模型定义
schemas.py
文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。
任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None
用户提交的任务请求包含以下字段:
code
: 任务的 Python 代码。expires
: 任务的过期时间(可选)。time_limit
: 任务的时间限制(可选)。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None
任务的执行结果包含以下字段:
status
: 任务的执行状态(如success
或failure
)。output
: 任务的标准输出(可选)。error
: 任务的错误输出(可选)。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None
任务的查询结果包含以下字段:
task_id
: 任务的 ID。task_status
: 任务的状态(如PENDING
、SUCCESS
等)。task_result
: 任务的执行结果(可选)。
4. app/__init__.py
:Celery 应用初始化
app/__init__.py
文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。
创建 Celery 应用
app = Celery('my_celery_project')
我们创建了一个名为 my_celery_project
的 Celery 应用。
加载配置
app.config_from_object('app.config')
从 app.config
文件中加载 Celery 的配置。
自动发现任务
app.autodiscover_tasks(['app.tasks'])
自动发现 app.tasks
模块中的任务。
Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")
定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。
5. app/config.py
:Celery 配置
app/config.py
文件定义了 Celery 的配置。
消息代理和结果存储
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'
使用 Redis 作为消息代理和结果存储。
任务结果过期时间
result_expires = 3600
任务结果在 Redis 中保存 1 小时后过期。
序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
使用 JSON 作为任务和结果的序列化格式。
时区配置
timezone = 'Asia/Shanghai'
enable_utc = True
设置时区为 Asia/Shanghai
,并启用 UTC 时间。
6. app/tasks/tasks.py
:任务执行逻辑
app/tasks/tasks.py
文件定义了一个 Celery 任务 execute_python_code
,用于执行用户提交的 Python 代码。
@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)
该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run
执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success
状态和标准输出;如果执行失败,返回 failure
状态和错误输出。最后,删除临时文件。
部署分析
version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false
在这个 Docker Compose 配置中,我们定义了三个服务:
- fastapi:FastAPI 应用,负责接收用户请求并分发任务。
- celery-worker:Celery 工作节点,负责执行异步任务。
- celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
- redis:Redis 服务,作为 Celery 的消息代理和结果存储。
代码的功能和价值
功能
-
异步任务执行:
- 用户可以通过
/tasks
接口提交 Python 代码,代码会被异步执行。 - 任务的执行结果可以通过
/tasks/{task_id}
接口查询。
- 用户可以通过
-
任务状态管理:
- 任务的状态(如
PENDING
、SUCCESS
、FAILURE
等)可以通过/tasks/{task_id}
接口查询。
- 任务的状态(如
-
高性能和可扩展性:
- 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
- Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
-
安全性:
- 通过设置
time_limit
和expires
,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
- 通过设置
-
易用性:
- FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
- Pydantic 模型确保了请求和响应数据的类型安全。
价值
-
高效的任务处理:
- 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
-
可扩展性:
- 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
-
安全性:
- 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
-
易用性:
- FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
-
灵活性:
- 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。
总结
通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。
无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!
附图
发送任务
查询结果
相关文章:

构建高性能异步任务引擎:FastAPI + Celery + Redis
在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPI、Celery 和 Redis …...

永磁同步电机无速度算法--全阶滑模观测器
一、原理介绍 在采用传统滑模观测器求取电机角度时通常存在系统抖振、低通滤波器导致角度相位滞后、角度的求取等问题。针对上述问题,本文采用全阶滑模观测器,该全阶滑模观测器具有二阶低通滤波器的特性,能有效滤除反电动势中的高频噪声&…...
部署开源大模型的硬件配置全面指南
目录 第一章:理解大型模型的硬件需求 1.1 模型部署需求分析 第二章:GPU资源平台 2.1 免费GPU资源 2.1.1 阿里云人工智能PAI 2.1.2 阿里天池实验室 2.1.3 Kaggle 2.1.4 Google Colab 2.2 付费GPU服务 2.2.1 AutoDL 2.2.2 Gpushare Cloud 2.2.3 Featurize 2.2.4 A…...

三、使用langchain搭建RAG:金融问答机器人--检索增强生成
经过前面2节数据准备后,现在来构建检索 加载向量数据库 from langchain.vectorstores import Chroma from langchain_huggingface import HuggingFaceEmbeddings import os# 定义 Embeddings embeddings HuggingFaceEmbeddings(model_name"m3e-base")#…...

Day13 用Excel表体验梯度下降法
Day13 用Excel表体验梯度下降法 用所学公式创建Excel表 用Excel表体验梯度下降法 详见本Day文章顶部附带资源里的Excel表《梯度下降法》,可以对照表里的单元格公式进行理解,还可以多尝试几次不同的学习率 η \eta η来感受,只需要更改学习率…...
计算机组成原理的学习笔记(5)--数据的表示与运算·其四 浮点数的储存和加减/内存对齐/大端小端
学习笔记 前言 本文主要是对于b站尚硅谷的计算机组成原理的学习笔记,仅用于学习交流。 1. 浮点数的表示与运算 规格化数: 浮点数的存储格式为 ,其中: 为符号位。 为尾数,通常在0和1之间(规格化形式为1.xx…...

华为IPD流程6大阶段370个流程活动详解_第二阶段:计划阶段 — 86个活动
华为IPD流程涵盖了产品从概念到上市的完整过程,各阶段活动明确且相互衔接。在概念启动阶段,产品经理和项目经理分析可行性,PAC评审后成立PDT。概念阶段则包括产品描述、市场定位、投资期望等内容的确定,同时组建PDT核心组并准备项目环境。团队培训涵盖团队建设、流程、业务…...
如何使用 Flask 框架创建简单的 Web 应用?
Flask是一个轻量级的Web应用框架,用Python编写,非常适合快速开发和原型设计。 它提供了必要的工具和技术来构建一个Web应用,同时保持核心简单,不强制使用特定的工具或库。 二、创建第一个Flask应用 安装Flask 首先,…...

将Minio设置为Django的默认Storage(django-storages)
这里写自定义目录标题 前置说明静态文件收集静态文件 使用django-storages来使Django集成Minio安装依赖settings.py测试收集静态文件测试媒体文件 前置说明 静态文件 Django默认的Storage是本地,项目中的CSS、图片、JS都是静态文件。一般会将静态文件放到一个单独…...

sed | 一些关于 sed 的笔记
sed 总结 sed 语法sed [-hnV] [-e<script>] [-f<script文件>] [文本文件]--- 参数:-e<script> 以选项中指定的script 来处理输入的文本文件-f<script文件> 以选项中指定的script 文件来处理输入的文本文件-n 禁用 pattern space 的默认输出…...

wtforms+flask_sqlalchemy在flask-admin视图下实现日期的修改与更新
背景: 在flask-admin 的modelview视图下实现自定义视图的表单修改/编辑是件不太那么容易的事情,特别是想不自定义前端view的情况下。 材料: wtformsflask_sqlalchemy 制作: 上代码 1、模型代码 from .exts import db from …...

AI的进阶之路:从机器学习到深度学习的演变(三)
(承接上集:AI的进阶之路:从机器学习到深度学习的演变(二)) 四、深度学习(DL):机器学习的革命性突破 深度学习(DL)作为机器学习的一个重要分支&am…...
thinkphp 多选框
视图 <div class"form-group"><label for"c-flag" class"control-label col-xs-12 col-sm-2 col-md-4">{:__(Flag)}</label><div class"col-xs-12 col-sm-8 col-md-8"><!--formatter:off--><select …...

机器学习《西瓜书》学习笔记《待续》
如果说,计算机科学是研究关于“算法”的学问,那么机器学习就是研究关于“学习算法”的学问。 目录 绪论引言基本术语 扩展向量的张成-span使用Markdown语法编写数学公式希腊字母的LaTex语法插入一些数学的结构插入定界符插入一些可变大小的符号插入一些函…...
STM32HAL I2C函数
8.5 使用IIC协议读写EEPROM 硬件方式实现 (HAL库) **HAL_I2C_Mem_Write() :这种方法可以写1个或者多个字节 ** /*** brief 以阻塞模式向指定的内存地址写入数据* param hi2c 指向 I2C_HandleTypeDef 结构体的指针,包含指定 I2C 的配置信息…...

洛谷 P1644 跳马问题 C语言
题目: P1644 跳马问题 - 洛谷 | 计算机科学教育新生态 题目背景 在爱与愁的故事第一弹第三章出来前先练练四道基本的回溯/搜索题吧…… 题目描述 中国象棋半张棋盘如图 1 所示。马自左下角 (0,0) 向右上角 (m,n) 跳。规定只能往右跳,不准往左跳。比…...

每天40分玩转Django:实操在线商城
实操在线商城 一、今日学习内容概述 模块重要程度主要内容商品模型⭐⭐⭐⭐⭐商品信息、分类管理购物车系统⭐⭐⭐⭐⭐购物车功能实现订单系统⭐⭐⭐⭐⭐订单处理、支付集成用户中心⭐⭐⭐⭐订单管理、个人信息 二、模型设计 # models.py from django.db import models fro…...

Bug解决!ImportError: cannot import name MutableMapping from collections
省流:python版本更新 而一些生态库的变量命名没更新变化导致的问题 起因是在win环境下装spark 但是发现这是python底层的问题 于是想写一篇这个错误的博客警戒世人 py实在是太多生态库了 但并不是所有的都维护的很好 大概可以理解成 python原先有个东西叫col…...

【Rust自学】4.5. 切片(Slice)
4.5.0. 写在正文之前 这是第四章的最后一篇文章了,在这里也顺便对这章做一个总结: 所有权、借用和切片的概念确保 Rust 程序在编译时的内存安全。 Rust语言让程序员能够以与其他系统编程语言相同的方式控制内存使用情况,但是当数据所有者超…...

医学图像 三维重建,原图与灰度图叠加,原图与多图叠加显示;多图像融合显示,彩色灰度图像融合
Part1: Summary 我们在做图像分割或融合时,有时需要显示多份数据进行叠加显示;可能需要这种效果: 四视图: 基于这个,我看一下网上的实现总结了一下;实现了以下几种效果: Part2:多种…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...

工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散
前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说,在叠衣服的过程中,我会带着团队对比各种模型、方法、策略,毕竟针对各个场景始终寻找更优的解决方案,是我个人和我司「七月在线」的职责之一 且个人认为,…...

【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...