当前位置: 首页 > news >正文

构建高性能异步任务引擎:FastAPI + Celery + Redis

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPICeleryRedis 构建一个高性能的异步任务引擎。

项目背景

技术栈介绍

  • FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
  • Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
  • Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。

项目目标

通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。

项目目录结构

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代码功能深度解析

1. main.py:FastAPI 应用的核心

main.py 是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。

FastAPI 应用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")

这里我们创建了一个 FastAPI 应用,命名为 Async Task API,版本为 1.0.0

自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。

全局异常处理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。

HTTP 中间件:计算请求处理时间
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time 中,方便调试和性能优化。

创建任务的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。

查询任务结果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。

2. utils.py:任务信息获取工具

utils.py 文件定义了一个工具函数 get_task_info,用于获取 Celery 任务的状态和结果。

def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通过 AsyncResult,我们可以获取任务的当前状态(如 PENDINGSUCCESSFAILURE 等)和执行结果。

3. schemas.py:数据模型定义

schemas.py 文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。

任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用户提交的任务请求包含以下字段:

  • code: 任务的 Python 代码。
  • expires: 任务的过期时间(可选)。
  • time_limit: 任务的时间限制(可选)。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任务的执行结果包含以下字段:

  • status: 任务的执行状态(如 successfailure)。
  • output: 任务的标准输出(可选)。
  • error: 任务的错误输出(可选)。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任务的查询结果包含以下字段:

  • task_id: 任务的 ID。
  • task_status: 任务的状态(如 PENDINGSUCCESS 等)。
  • task_result: 任务的执行结果(可选)。

4. app/__init__.py:Celery 应用初始化

app/__init__.py 文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。

创建 Celery 应用
app = Celery('my_celery_project')

我们创建了一个名为 my_celery_project 的 Celery 应用。

加载配置
app.config_from_object('app.config')

app.config 文件中加载 Celery 的配置。

自动发现任务
app.autodiscover_tasks(['app.tasks'])

自动发现 app.tasks 模块中的任务。

Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")

定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。

5. app/config.py:Celery 配置

app/config.py 文件定义了 Celery 的配置。

消息代理和结果存储
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作为消息代理和结果存储。

任务结果过期时间
result_expires = 3600

任务结果在 Redis 中保存 1 小时后过期。

序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作为任务和结果的序列化格式。

时区配置
timezone = 'Asia/Shanghai'
enable_utc = True

设置时区为 Asia/Shanghai,并启用 UTC 时间。

6. app/tasks/tasks.py:任务执行逻辑

app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code,用于执行用户提交的 Python 代码。

@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run 执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success 状态和标准输出;如果执行失败,返回 failure 状态和错误输出。最后,删除临时文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在这个 Docker Compose 配置中,我们定义了三个服务:

  • fastapi:FastAPI 应用,负责接收用户请求并分发任务。
  • celery-worker:Celery 工作节点,负责执行异步任务。
  • celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
  • redis:Redis 服务,作为 Celery 的消息代理和结果存储。

代码的功能和价值

功能

  1. 异步任务执行

    • 用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。
    • 任务的执行结果可以通过 /tasks/{task_id} 接口查询。
  2. 任务状态管理

    • 任务的状态(如 PENDINGSUCCESSFAILURE 等)可以通过 /tasks/{task_id} 接口查询。
  3. 高性能和可扩展性

    • 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
    • Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
  4. 安全性

    • 通过设置 time_limitexpires,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
  5. 易用性

    • FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
    • Pydantic 模型确保了请求和响应数据的类型安全。

价值

  1. 高效的任务处理

    • 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
  2. 可扩展性

    • 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
  3. 安全性

    • 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
  4. 易用性

    • FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
  5. 灵活性

    • 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。

总结

通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。

无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!

附图

发送任务
在这里插入图片描述
查询结果
在这里插入图片描述

相关文章:

构建高性能异步任务引擎:FastAPI + Celery + Redis

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPI、Celery 和 Redis …...

永磁同步电机无速度算法--全阶滑模观测器

一、原理介绍 在采用传统滑模观测器求取电机角度时通常存在系统抖振、低通滤波器导致角度相位滞后、角度的求取等问题。针对上述问题,本文采用全阶滑模观测器,该全阶滑模观测器具有二阶低通滤波器的特性,能有效滤除反电动势中的高频噪声&…...

部署开源大模型的硬件配置全面指南

目录 第一章:理解大型模型的硬件需求 1.1 模型部署需求分析 第二章:GPU资源平台 2.1 免费GPU资源 2.1.1 阿里云人工智能PAI 2.1.2 阿里天池实验室 2.1.3 Kaggle 2.1.4 Google Colab 2.2 付费GPU服务 2.2.1 AutoDL 2.2.2 Gpushare Cloud 2.2.3 Featurize 2.2.4 A…...

三、使用langchain搭建RAG:金融问答机器人--检索增强生成

经过前面2节数据准备后,现在来构建检索 加载向量数据库 from langchain.vectorstores import Chroma from langchain_huggingface import HuggingFaceEmbeddings import os# 定义 Embeddings embeddings HuggingFaceEmbeddings(model_name"m3e-base")#…...

Day13 用Excel表体验梯度下降法

Day13 用Excel表体验梯度下降法 用所学公式创建Excel表 用Excel表体验梯度下降法 详见本Day文章顶部附带资源里的Excel表《梯度下降法》,可以对照表里的单元格公式进行理解,还可以多尝试几次不同的学习率 η \eta η来感受,只需要更改学习率…...

计算机组成原理的学习笔记(5)--数据的表示与运算·其四 浮点数的储存和加减/内存对齐/大端小端

学习笔记 前言 本文主要是对于b站尚硅谷的计算机组成原理的学习笔记,仅用于学习交流。 1. 浮点数的表示与运算 规格化数: 浮点数的存储格式为 ,其中: 为符号位。 为尾数,通常在0和1之间(规格化形式为1.xx…...

华为IPD流程6大阶段370个流程活动详解_第二阶段:计划阶段 — 86个活动

华为IPD流程涵盖了产品从概念到上市的完整过程,各阶段活动明确且相互衔接。在概念启动阶段,产品经理和项目经理分析可行性,PAC评审后成立PDT。概念阶段则包括产品描述、市场定位、投资期望等内容的确定,同时组建PDT核心组并准备项目环境。团队培训涵盖团队建设、流程、业务…...

如何使用 Flask 框架创建简单的 Web 应用?

Flask是一个轻量级的Web应用框架,用Python编写,非常适合快速开发和原型设计。 它提供了必要的工具和技术来构建一个Web应用,同时保持核心简单,不强制使用特定的工具或库。 二、创建第一个Flask应用 安装Flask 首先&#xff0c…...

将Minio设置为Django的默认Storage(django-storages)

这里写自定义目录标题 前置说明静态文件收集静态文件 使用django-storages来使Django集成Minio安装依赖settings.py测试收集静态文件测试媒体文件 前置说明 静态文件 Django默认的Storage是本地,项目中的CSS、图片、JS都是静态文件。一般会将静态文件放到一个单独…...

sed | 一些关于 sed 的笔记

sed 总结 sed 语法sed [-hnV] [-e<script>] [-f<script文件>] [文本文件]--- 参数&#xff1a;-e<script> 以选项中指定的script 来处理输入的文本文件-f<script文件> 以选项中指定的script 文件来处理输入的文本文件-n 禁用 pattern space 的默认输出…...

wtforms+flask_sqlalchemy在flask-admin视图下实现日期的修改与更新

背景&#xff1a; 在flask-admin 的modelview视图下实现自定义视图的表单修改/编辑是件不太那么容易的事情&#xff0c;特别是想不自定义前端view的情况下。 材料&#xff1a; wtformsflask_sqlalchemy 制作&#xff1a; 上代码 1、模型代码 from .exts import db from …...

AI的进阶之路:从机器学习到深度学习的演变(三)

&#xff08;承接上集&#xff1a;AI的进阶之路&#xff1a;从机器学习到深度学习的演变&#xff08;二&#xff09;&#xff09; 四、深度学习&#xff08;DL&#xff09;&#xff1a;机器学习的革命性突破 深度学习&#xff08;DL&#xff09;作为机器学习的一个重要分支&am…...

thinkphp 多选框

视图 <div class"form-group"><label for"c-flag" class"control-label col-xs-12 col-sm-2 col-md-4">{:__(Flag)}</label><div class"col-xs-12 col-sm-8 col-md-8"><!--formatter:off--><select …...

机器学习《西瓜书》学习笔记《待续》

如果说&#xff0c;计算机科学是研究关于“算法”的学问&#xff0c;那么机器学习就是研究关于“学习算法”的学问。 目录 绪论引言基本术语 扩展向量的张成-span使用Markdown语法编写数学公式希腊字母的LaTex语法插入一些数学的结构插入定界符插入一些可变大小的符号插入一些函…...

STM32HAL I2C函数

8.5 使用IIC协议读写EEPROM 硬件方式实现 &#xff08;HAL库&#xff09; **HAL_I2C_Mem_Write() :这种方法可以写1个或者多个字节 ** /*** brief 以阻塞模式向指定的内存地址写入数据* param hi2c 指向 I2C_HandleTypeDef 结构体的指针&#xff0c;包含指定 I2C 的配置信息…...

洛谷 P1644 跳马问题 C语言

题目&#xff1a; P1644 跳马问题 - 洛谷 | 计算机科学教育新生态 题目背景 在爱与愁的故事第一弹第三章出来前先练练四道基本的回溯/搜索题吧…… 题目描述 中国象棋半张棋盘如图 1 所示。马自左下角 (0,0) 向右上角 (m,n) 跳。规定只能往右跳&#xff0c;不准往左跳。比…...

每天40分玩转Django:实操在线商城

实操在线商城 一、今日学习内容概述 模块重要程度主要内容商品模型⭐⭐⭐⭐⭐商品信息、分类管理购物车系统⭐⭐⭐⭐⭐购物车功能实现订单系统⭐⭐⭐⭐⭐订单处理、支付集成用户中心⭐⭐⭐⭐订单管理、个人信息 二、模型设计 # models.py from django.db import models fro…...

Bug解决!ImportError: cannot import name MutableMapping from collections

省流&#xff1a;python版本更新 而一些生态库的变量命名没更新变化导致的问题 起因是在win环境下装spark 但是发现这是python底层的问题 于是想写一篇这个错误的博客警戒世人 py实在是太多生态库了 但并不是所有的都维护的很好 大概可以理解成 python原先有个东西叫col…...

【Rust自学】4.5. 切片(Slice)

4.5.0. 写在正文之前 这是第四章的最后一篇文章了&#xff0c;在这里也顺便对这章做一个总结&#xff1a; 所有权、借用和切片的概念确保 Rust 程序在编译时的内存安全。 Rust语言让程序员能够以与其他系统编程语言相同的方式控制内存使用情况&#xff0c;但是当数据所有者超…...

医学图像 三维重建,原图与灰度图叠加,原图与多图叠加显示;多图像融合显示,彩色灰度图像融合

Part1: Summary 我们在做图像分割或融合时&#xff0c;有时需要显示多份数据进行叠加显示&#xff1b;可能需要这种效果&#xff1a; 四视图&#xff1a; 基于这个&#xff0c;我看一下网上的实现总结了一下&#xff1b;实现了以下几种效果&#xff1a; Part2&#xff1a;多种…...

递归实现指数型枚举(递归)

92. 递归实现指数型枚举 - AcWing题库 每个数有选和不选两种情况 我们把每个数看成每层&#xff0c;可以画出一个递归搜索树 叶子节点就是我们的答案 很容易写出每dfs函数 dfs传入一个u表示层数 当层数大于我们n时&#xff0c;去判断每个数字的选择情况&#xff0c;输出被选…...

Unity实现Root Motion动画的Navigation自动导航

Root motion动画可以将角色的根节点&#xff08;通常是角色的骨盆或脚部&#xff09;的运动直接应用到游戏对象上&#xff0c;从而实现角色的自然移动和旋转&#xff0c;避免出现脚底打滑的现象。采用Root motion动画的游戏对象&#xff0c;通常是重载了onAnimatorMove函数&…...

[react]不能将类型“string | undefined”分配给类型“To”。 不能将类型“undefined”分配给类型“To”

场景, 封装组件的时候, 想通过外部传进去一个路由地址, 再用<Link to{}>跳转, 显示这个, 有四种方法解决 第一种 合并运算符 ?? ?? 是 空值合并运算符&#xff08;Nullish Coalescing Operator&#xff09;&#xff0c;它是 JavaScript 和 TypeScript 中的一种逻辑…...

python实现基于RPC协议的接口自动化测试

01 什么是RPC RPC&#xff08;Remote Procedure Call&#xff09;远程过程调用协议是一个用于建立适当框架的协议。从本质上讲&#xff0c;它使一台机器上的程序能够调用另一台机器上的子程序&#xff0c;而不会意识到它是远程的。 RPC 是一种软件通信协议&#xff0c;一个程…...

如何使用PSQL Tool还原pg数据库(sql格式)

新建一个数据库用来还原&#xff1b;选择新建的数据库&#xff0c;右键选择【PSQL Tool】&#xff0c;打开PSQL Tool命令行界面&#xff1b;赋予pg库对sql文件的执行权限&#xff0c;否则会报“Permission denied”的错误&#xff0c;命令如下&#xff1a; chmod urwx D://NoS…...

uni-app商品搜索页面

目录 一:功能概述 二:功能实现 一:功能概述 商品搜索页面,可以根据商品品牌,商品分类,商品价格等信息实现商品搜索和列表展示。 二:功能实现 1:商品搜索数据 <view class="search-map padding-main bg-base"> <view class…...

【深度学习】零基础介绍循环神经网络(RNN)

RNN介绍 零基础介绍语言处理技术基本介绍分词算法词法分析工具文本分类与聚类情感分析 自然语言处理词向量词向量学习模型1. 神经网络语言模型2. CBOW 和 skip-gram3. 层次化softmax方法4. 负采样方法 RNN介绍RNN的变种&#xff1a;LSTM1. Forget Gate2. Input Gate3. Update M…...

青少年编程与数学 02-004 Go语言Web编程 13课题、模板引擎

青少年编程与数学 02-004 Go语言Web编程 13课题、模板引擎 一、模板引擎模板引擎的主要特点包括&#xff1a;模板引擎的应用场景&#xff1a;Go语言中的模板引擎&#xff1a;示例&#xff1a;使用Go的html/template包 二、工作流程1. 创建模板文件2. 准备数据3. 加载模板4. 渲染…...

如何优雅的关闭GoWeb服务器

以下内容均为Let’s Go Further内容节选以及作者本人理解。 这里创建了一个后台进程用于捕获关闭信号&#xff0c;在后台进程中&#xff0c;主要内容为&#xff1a; 创建一个缓冲通道 quit使用signal.Notify函数监听并捕获关机信号SIGINT,SIGTERM&#xff0c;在捕获关机信号后…...

AI程序员,开源的Devin,OpenHands 如何使用HuggingFace Inference API

我用了一下&#xff0c;界面这样子&#xff1a; Github&#xff1a;https://github.com/All-Hands-AI/OpenHands OpenHands 如何使用HuggingFace Inference API huggingface/meta-llama/Llama-3.3-70B-Instruct 而不是 meta-llama/Llama-3.3-70B-Instruct 不要设置base URL&…...