当前位置: 首页 > news >正文

构建高性能异步任务引擎:FastAPI + Celery + Redis

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPICeleryRedis 构建一个高性能的异步任务引擎。

项目背景

技术栈介绍

  • FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
  • Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
  • Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。

项目目标

通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。

项目目录结构

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代码功能深度解析

1. main.py:FastAPI 应用的核心

main.py 是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。

FastAPI 应用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")

这里我们创建了一个 FastAPI 应用,命名为 Async Task API,版本为 1.0.0

自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。

全局异常处理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。

HTTP 中间件:计算请求处理时间
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time 中,方便调试和性能优化。

创建任务的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。

查询任务结果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。

2. utils.py:任务信息获取工具

utils.py 文件定义了一个工具函数 get_task_info,用于获取 Celery 任务的状态和结果。

def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通过 AsyncResult,我们可以获取任务的当前状态(如 PENDINGSUCCESSFAILURE 等)和执行结果。

3. schemas.py:数据模型定义

schemas.py 文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。

任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用户提交的任务请求包含以下字段:

  • code: 任务的 Python 代码。
  • expires: 任务的过期时间(可选)。
  • time_limit: 任务的时间限制(可选)。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任务的执行结果包含以下字段:

  • status: 任务的执行状态(如 successfailure)。
  • output: 任务的标准输出(可选)。
  • error: 任务的错误输出(可选)。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任务的查询结果包含以下字段:

  • task_id: 任务的 ID。
  • task_status: 任务的状态(如 PENDINGSUCCESS 等)。
  • task_result: 任务的执行结果(可选)。

4. app/__init__.py:Celery 应用初始化

app/__init__.py 文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。

创建 Celery 应用
app = Celery('my_celery_project')

我们创建了一个名为 my_celery_project 的 Celery 应用。

加载配置
app.config_from_object('app.config')

app.config 文件中加载 Celery 的配置。

自动发现任务
app.autodiscover_tasks(['app.tasks'])

自动发现 app.tasks 模块中的任务。

Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")

定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。

5. app/config.py:Celery 配置

app/config.py 文件定义了 Celery 的配置。

消息代理和结果存储
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作为消息代理和结果存储。

任务结果过期时间
result_expires = 3600

任务结果在 Redis 中保存 1 小时后过期。

序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作为任务和结果的序列化格式。

时区配置
timezone = 'Asia/Shanghai'
enable_utc = True

设置时区为 Asia/Shanghai,并启用 UTC 时间。

6. app/tasks/tasks.py:任务执行逻辑

app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code,用于执行用户提交的 Python 代码。

@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run 执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success 状态和标准输出;如果执行失败,返回 failure 状态和错误输出。最后,删除临时文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在这个 Docker Compose 配置中,我们定义了三个服务:

  • fastapi:FastAPI 应用,负责接收用户请求并分发任务。
  • celery-worker:Celery 工作节点,负责执行异步任务。
  • celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
  • redis:Redis 服务,作为 Celery 的消息代理和结果存储。

代码的功能和价值

功能

  1. 异步任务执行

    • 用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。
    • 任务的执行结果可以通过 /tasks/{task_id} 接口查询。
  2. 任务状态管理

    • 任务的状态(如 PENDINGSUCCESSFAILURE 等)可以通过 /tasks/{task_id} 接口查询。
  3. 高性能和可扩展性

    • 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
    • Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
  4. 安全性

    • 通过设置 time_limitexpires,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
  5. 易用性

    • FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
    • Pydantic 模型确保了请求和响应数据的类型安全。

价值

  1. 高效的任务处理

    • 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
  2. 可扩展性

    • 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
  3. 安全性

    • 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
  4. 易用性

    • FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
  5. 灵活性

    • 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。

总结

通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。

无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!

附图

发送任务
在这里插入图片描述
查询结果
在这里插入图片描述

相关文章:

构建高性能异步任务引擎:FastAPI + Celery + Redis

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPI、Celery 和 Redis …...

永磁同步电机无速度算法--全阶滑模观测器

一、原理介绍 在采用传统滑模观测器求取电机角度时通常存在系统抖振、低通滤波器导致角度相位滞后、角度的求取等问题。针对上述问题,本文采用全阶滑模观测器,该全阶滑模观测器具有二阶低通滤波器的特性,能有效滤除反电动势中的高频噪声&…...

部署开源大模型的硬件配置全面指南

目录 第一章:理解大型模型的硬件需求 1.1 模型部署需求分析 第二章:GPU资源平台 2.1 免费GPU资源 2.1.1 阿里云人工智能PAI 2.1.2 阿里天池实验室 2.1.3 Kaggle 2.1.4 Google Colab 2.2 付费GPU服务 2.2.1 AutoDL 2.2.2 Gpushare Cloud 2.2.3 Featurize 2.2.4 A…...

三、使用langchain搭建RAG:金融问答机器人--检索增强生成

经过前面2节数据准备后,现在来构建检索 加载向量数据库 from langchain.vectorstores import Chroma from langchain_huggingface import HuggingFaceEmbeddings import os# 定义 Embeddings embeddings HuggingFaceEmbeddings(model_name"m3e-base")#…...

Day13 用Excel表体验梯度下降法

Day13 用Excel表体验梯度下降法 用所学公式创建Excel表 用Excel表体验梯度下降法 详见本Day文章顶部附带资源里的Excel表《梯度下降法》,可以对照表里的单元格公式进行理解,还可以多尝试几次不同的学习率 η \eta η来感受,只需要更改学习率…...

计算机组成原理的学习笔记(5)--数据的表示与运算·其四 浮点数的储存和加减/内存对齐/大端小端

学习笔记 前言 本文主要是对于b站尚硅谷的计算机组成原理的学习笔记,仅用于学习交流。 1. 浮点数的表示与运算 规格化数: 浮点数的存储格式为 ,其中: 为符号位。 为尾数,通常在0和1之间(规格化形式为1.xx…...

华为IPD流程6大阶段370个流程活动详解_第二阶段:计划阶段 — 86个活动

华为IPD流程涵盖了产品从概念到上市的完整过程,各阶段活动明确且相互衔接。在概念启动阶段,产品经理和项目经理分析可行性,PAC评审后成立PDT。概念阶段则包括产品描述、市场定位、投资期望等内容的确定,同时组建PDT核心组并准备项目环境。团队培训涵盖团队建设、流程、业务…...

如何使用 Flask 框架创建简单的 Web 应用?

Flask是一个轻量级的Web应用框架,用Python编写,非常适合快速开发和原型设计。 它提供了必要的工具和技术来构建一个Web应用,同时保持核心简单,不强制使用特定的工具或库。 二、创建第一个Flask应用 安装Flask 首先&#xff0c…...

将Minio设置为Django的默认Storage(django-storages)

这里写自定义目录标题 前置说明静态文件收集静态文件 使用django-storages来使Django集成Minio安装依赖settings.py测试收集静态文件测试媒体文件 前置说明 静态文件 Django默认的Storage是本地,项目中的CSS、图片、JS都是静态文件。一般会将静态文件放到一个单独…...

sed | 一些关于 sed 的笔记

sed 总结 sed 语法sed [-hnV] [-e<script>] [-f<script文件>] [文本文件]--- 参数&#xff1a;-e<script> 以选项中指定的script 来处理输入的文本文件-f<script文件> 以选项中指定的script 文件来处理输入的文本文件-n 禁用 pattern space 的默认输出…...

wtforms+flask_sqlalchemy在flask-admin视图下实现日期的修改与更新

背景&#xff1a; 在flask-admin 的modelview视图下实现自定义视图的表单修改/编辑是件不太那么容易的事情&#xff0c;特别是想不自定义前端view的情况下。 材料&#xff1a; wtformsflask_sqlalchemy 制作&#xff1a; 上代码 1、模型代码 from .exts import db from …...

AI的进阶之路:从机器学习到深度学习的演变(三)

&#xff08;承接上集&#xff1a;AI的进阶之路&#xff1a;从机器学习到深度学习的演变&#xff08;二&#xff09;&#xff09; 四、深度学习&#xff08;DL&#xff09;&#xff1a;机器学习的革命性突破 深度学习&#xff08;DL&#xff09;作为机器学习的一个重要分支&am…...

thinkphp 多选框

视图 <div class"form-group"><label for"c-flag" class"control-label col-xs-12 col-sm-2 col-md-4">{:__(Flag)}</label><div class"col-xs-12 col-sm-8 col-md-8"><!--formatter:off--><select …...

机器学习《西瓜书》学习笔记《待续》

如果说&#xff0c;计算机科学是研究关于“算法”的学问&#xff0c;那么机器学习就是研究关于“学习算法”的学问。 目录 绪论引言基本术语 扩展向量的张成-span使用Markdown语法编写数学公式希腊字母的LaTex语法插入一些数学的结构插入定界符插入一些可变大小的符号插入一些函…...

STM32HAL I2C函数

8.5 使用IIC协议读写EEPROM 硬件方式实现 &#xff08;HAL库&#xff09; **HAL_I2C_Mem_Write() :这种方法可以写1个或者多个字节 ** /*** brief 以阻塞模式向指定的内存地址写入数据* param hi2c 指向 I2C_HandleTypeDef 结构体的指针&#xff0c;包含指定 I2C 的配置信息…...

洛谷 P1644 跳马问题 C语言

题目&#xff1a; P1644 跳马问题 - 洛谷 | 计算机科学教育新生态 题目背景 在爱与愁的故事第一弹第三章出来前先练练四道基本的回溯/搜索题吧…… 题目描述 中国象棋半张棋盘如图 1 所示。马自左下角 (0,0) 向右上角 (m,n) 跳。规定只能往右跳&#xff0c;不准往左跳。比…...

每天40分玩转Django:实操在线商城

实操在线商城 一、今日学习内容概述 模块重要程度主要内容商品模型⭐⭐⭐⭐⭐商品信息、分类管理购物车系统⭐⭐⭐⭐⭐购物车功能实现订单系统⭐⭐⭐⭐⭐订单处理、支付集成用户中心⭐⭐⭐⭐订单管理、个人信息 二、模型设计 # models.py from django.db import models fro…...

Bug解决!ImportError: cannot import name MutableMapping from collections

省流&#xff1a;python版本更新 而一些生态库的变量命名没更新变化导致的问题 起因是在win环境下装spark 但是发现这是python底层的问题 于是想写一篇这个错误的博客警戒世人 py实在是太多生态库了 但并不是所有的都维护的很好 大概可以理解成 python原先有个东西叫col…...

【Rust自学】4.5. 切片(Slice)

4.5.0. 写在正文之前 这是第四章的最后一篇文章了&#xff0c;在这里也顺便对这章做一个总结&#xff1a; 所有权、借用和切片的概念确保 Rust 程序在编译时的内存安全。 Rust语言让程序员能够以与其他系统编程语言相同的方式控制内存使用情况&#xff0c;但是当数据所有者超…...

医学图像 三维重建,原图与灰度图叠加,原图与多图叠加显示;多图像融合显示,彩色灰度图像融合

Part1: Summary 我们在做图像分割或融合时&#xff0c;有时需要显示多份数据进行叠加显示&#xff1b;可能需要这种效果&#xff1a; 四视图&#xff1a; 基于这个&#xff0c;我看一下网上的实现总结了一下&#xff1b;实现了以下几种效果&#xff1a; Part2&#xff1a;多种…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...