当前位置: 首页 > article >正文

Flask与Celery 项目应用(shared_task使用)

目录

    • 1. 项目概述
      • 主要功能
      • 技术栈
    • 2. 项目结构
    • 3. 环境设置
      • 创建虚拟环境并安装依赖
      • 主要依赖
    • 4. 应用配置
      • Flask应用初始化 (`__init__.py`)
      • Celery应用初始化 (`make_celery.py`)
    • 5. 定义Celery任务 (`tasks.py`)
      • 任务说明
    • 6. 创建API端点 (`views.py`)
      • API端点说明
    • 7. 前端界面 (`index.html`)
      • 前端JavaScript实现
    • 8. 运行应用
    • 用docker启动redis
      • 启动Celery Worker
      • 启动Flask开发服务器
    • 9. 关键概念解析
      • Celery任务装饰器
      • 任务ID
      • 任务状态查询

1. 项目概述

本教程将指导您创建一个Flask应用,该应用使用Celery处理后台任务。我们将构建一个简单的Web界面,允许用户提交三种不同类型的任务,并通过JavaScript轮询查看任务结果。

主要功能

  • 简单计算任务:将两个数字相加
  • 阻塞任务:模拟长时间运行的任务
  • 进度报告任务:显示任务执行进度

技术栈

  • Flask: Web框架
  • Celery: 分布式任务队列
  • Redis: 消息代理和结果后端
  • JavaScript: 前端交互和轮询

2. 项目结构

flask_celery_app/
├── .venv/                  # 虚拟环境
├── README.md              # 项目说明
├── make_celery.py         # Celery应用初始化
├── pyproject.toml         # 项目配置
├── requirements.txt       # 依赖列表
└── src/└── task_app/├── __init__.py    # Flask应用初始化├── tasks.py       # Celery任务定义├── views.py       # Flask视图和API└── templates/└── index.html  # 前端界面

3. 环境设置

创建虚拟环境并安装依赖

uv venv
Using CPython 3.12.10
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
uv pip install -r requirements.txt 
uv pip install -e .

主要依赖

  • Flask
  • Celery
  • Redis

4. 应用配置

Flask应用初始化 (__init__.py)

from celery import Celery
from celery import Task
from flask import Flask
from flask import render_templatedef create_app() -> Flask:app = Flask(__name__)app.config.from_mapping(CELERY=dict(broker_url="redis://localhost",result_backend="redis://localhost",task_ignore_result=True,),)app.config.from_prefixed_env()celery_init_app(app)@app.route("/")def index() -> str:return render_template("index.html")from . import viewsapp.register_blueprint(views.bp)return appdef celery_init_app(app: Flask) -> Celery:class FlaskTask(Task):def __call__(self, *args: object, **kwargs: object) -> object:with app.app_context():return self.run(*args, **kwargs)celery_app = Celery(app.name, task_cls=FlaskTask)celery_app.config_from_object(app.config["CELERY"])celery_app.set_default()app.extensions["celery"] = celery_appreturn celery_app

Celery应用初始化 (make_celery.py)

from task_app import create_appflask_app = create_app()
celery_app = flask_app.extensions["celery"]

5. 定义Celery任务 (tasks.py)

import timefrom celery import shared_task
from celery import Task@shared_task(ignore_result=False)
def add(a: int, b: int) -> int:return a + b@shared_task()
def block() -> None:time.sleep(5)@shared_task(bind=True, ignore_result=False)
def process(self: Task, total: int) -> object:for i in range(total):self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total})time.sleep(1)return {"current": total, "total": total}

任务说明

  1. add任务: 简单的加法计算,设置ignore_result=False以保存结果
  2. block任务: 模拟耗时操作,不返回结果
  3. process任务: 带进度报告的任务,使用update_state更新进度

6. 创建API端点 (views.py)

from celery.result import AsyncResult
from flask import Blueprint
from flask import requestfrom . import tasksbp = Blueprint("tasks", __name__, url_prefix="/tasks")@bp.get("/result/<id>")
def result(id: str) -> dict[str, object]:result = AsyncResult(id)ready = result.ready()return {"ready": ready,"successful": result.successful() if ready else None,"value": result.get() if ready else result.result,}@bp.post("/add")
def add() -> dict[str, object]:a = request.form.get("a", type=int)b = request.form.get("b", type=int)result = tasks.add.delay(a, b)return {"result_id": result.id}@bp.post("/block")
def block() -> dict[str, object]:result = tasks.block.delay()return {"result_id": result.id}@bp.post("/process")
def process() -> dict[str, object]:result = tasks.process.delay(total=request.form.get("total", type=int))return {"result_id": result.id}

API端点说明

  1. /tasks/result/: 获取任务结果
  2. /tasks/add: 提交加法任务
  3. /tasks/block: 提交阻塞任务
  4. /tasks/process: 提交带进度的任务

7. 前端界面 (index.html)

<!doctype html>
<html>
<head><meta charset=UTF-8><title>Celery Example</title>
</head>
<body>
<h2>Celery Example</h2>
Execute background tasks with Celery. Submits tasks and shows results using JavaScript.<hr>
<h4>Add</h4>
<p>Start a task to add two numbers, then poll for the result.
<form id=add method=post action="{{ url_for("tasks.add") }}"><label>A <input type=number name=a value=4></label><br><label>B <input type=number name=b value=2></label><br><input type=submit>
</form>
<p>Result: <span id=add-result></span></p><hr>
<h4>Block</h4>
<p>Start a task that takes 5 seconds. However, the response will return immediately.
<form id=block method=post action="{{ url_for("tasks.block") }}"><input type=submit>
</form>
<p id=block-result></p><hr>
<h4>Process</h4>
<p>Start a task that counts, waiting one second each time, showing progress.
<form id=process method=post action="{{ url_for("tasks.process") }}"><label>Total <input type=number name=total value="10"></label><br><input type=submit>
</form>
<p id=process-result></p><script>const taskForm = (formName, doPoll, report) => {document.forms[formName].addEventListener("submit", (event) => {event.preventDefault()fetch(event.target.action, {method: "POST",body: new FormData(event.target)}).then(response => response.json()).then(data => {report(null)const poll = () => {fetch(`/tasks/result/${data["result_id"]}`).then(response => response.json()).then(data => {report(data)if (!data["ready"]) {setTimeout(poll, 500)} else if (!data["successful"]) {console.error(formName, data)}})}if (doPoll) {poll()}})})}taskForm("add", true, data => {const el = document.getElementById("add-result")if (data === null) {el.innerText = "submitted"} else if (!data["ready"]) {el.innerText = "waiting"} else if (!data["successful"]) {el.innerText = "error, check console"} else {el.innerText = data["value"]}})taskForm("block", false, data => {document.getElementById("block-result").innerText = ("request finished, check celery log to see task finish in 5 seconds")})taskForm("process", true, data => {const el = document.getElementById("process-result")if (data === null) {el.innerText = "submitted"} else if (!data["ready"]) {el.innerText = `${data["value"]["current"]} / ${data["value"]["total"]}`} else if (!data["successful"]) {el.innerText = "error, check console"} else {el.innerText = "✅ done"}console.log(data)})</script>
</body>
</html>

前端JavaScript实现

前端使用JavaScript发送任务请求并轮询结果:

  1. 提交表单时阻止默认行为,使用fetch API发送POST请求
  2. 获取任务ID后,定期轮询/tasks/result/<id>端点
  3. 根据任务状态更新UI

8. 运行应用

用docker启动redis

docker run --name redis-server -p 6379:6379 -d redis

启动Celery Worker

# 在第一个终端窗口
celery -A make_celery worker --loglevel INFO

在这里插入图片描述

启动Flask开发服务器

# 在第二个终端窗口
flask -A task_app run --debug

在这里插入图片描述

访问 http://localhost:5000/ 使用应用。
在这里插入图片描述

9. 关键概念解析

Celery任务装饰器

  • @shared_task(ignore_result=False): 创建可共享的任务,并保存结果
  • bind=True: 将任务实例作为第一个参数传递给任务函数

任务ID

  • 每个任务都有一个自动生成的唯一ID
  • 通过result.id获取,用于后续查询任务状态

任务状态查询

  • AsyncResult(id): 通过ID获取任务结果对象
  • result.ready(): 检查任务是否完成
  • result.successful(): 检查任务是否成功完成
  • result.get(): 获取任务结果

项目链接:https://github.com/pallets/flask/tree/main/examples/celery
可用downgit单独下载项目中某个文件夹:https://minhaskamal.github.io/DownGit/#/home?url=https:%2F%2Fgithub.com%2Fpallets%2Fflask%2Ftree%2Fmain%2Fexamples%2Fcelery

相关文章:

Flask与Celery 项目应用(shared_task使用)

目录 1. 项目概述主要功能技术栈 2. 项目结构3. 环境设置创建虚拟环境并安装依赖主要依赖 4. 应用配置Flask应用初始化 (__init__.py)Celery应用初始化 (make_celery.py) 5. 定义Celery任务 (tasks.py)任务说明 6. 创建API端点 (views.py)API端点说明 7. 前端界面 (index.html)…...

Fetch API 使用详解:Bearer Token 与 localStorage 实践

Fetch API&#xff1a;现代浏览器内置的用于发送 HTTP 请求的 API&#xff0c;Bearer Token&#xff1a;一种基于令牌的身份验证方案&#xff0c;常用于 JWT 认证&#xff0c;localStorage&#xff1a;浏览器提供的持久化存储方案&#xff0c;用于在客户端存储数据。 token是我…...

vue3 vite.config.js 引入bem.scss文件报错

[sass] Can’t find stylesheet to import. ╷ 1 │ use “/bem.scss” as *; │ ^^^^^^^^^^^^^^^^^^^^^^ ╵ src\App.vue 1:1 root stylesheet 分析 我们遇到了一个在Vue3项目中使用Vite时&#xff0c;在vite.config.js中引入bem.scss文件报错的问题。错误信息指出在App.vue…...

二叉树-226.翻转链表-力扣(LeetCode)

一、题目解析 翻转可以理解为树的左右子树交换&#xff0c;从根到叶子节点&#xff0c;但是这里交换的是链接的指针&#xff0c;而不是单纯的交换值&#xff0c;当出现nullptr时&#xff0c;也是可以交换链接的&#xff0c;交换值的话就不行了。 二、算法原理 依旧的递归&…...

HarmonyOS Next 弹窗系列教程(3)

HarmonyOS Next 弹窗系列教程&#xff08;3&#xff09; 选择器弹窗 (PickerDialog) 介绍 选择器弹窗通常用于在用户进行某些操作&#xff08;如点击按钮&#xff09;时显示特定的信息或选项。让用户可以进行选择提供的固定的内容。 以下内容都属于选择器弹窗&#xff1a; …...

编程笔记---问题小计

编程笔记 qml ProgressBar 为什么valuemodel.progress / 100 在QML中&#xff0c;ProgressBar的value属性用于表示进度条的当前进度值&#xff0c;其范围通常为0到1&#xff08;或0%到100%&#xff09;。当使用model.progress / 100来设置value时&#xff0c;这样做的原因是为…...

【docker】Windows安装docker

环境及工具&#xff08;点击下载&#xff09; Docker Desktop Installer.exe &#xff08;windows 环境下运行docker的一款产品&#xff09; wsl_update_x64 &#xff08;Linux 内核包&#xff09; 前期准备 系统要求2&#xff1a; Windows 11&#xff1a;64 位系统&am…...

无人机避障——感知部分(Ubuntu 20.04 复现Vins Fusion跑数据集)胎教级教程

硬件环境&#xff1a;NVIDIA Jeston Orin nx 系统&#xff1a;Ubuntu 20.04 任务&#xff1a;跑通 EuRoC MAV Dataset 数据集 展示结果&#xff1a; 编译Vins Fusion 创建工作空间vins_ws # 创建目录结构 mkdir -p ~/vins_ws/srccd ~/vins_ws/src# 初始化工作空间&#xf…...

人工智能--大型语言模型的存储

好的&#xff0c;我现在需要回答用户关于GGUF文件和safetensors文件后缀的差别的问题。首先&#xff0c;我得先确认这两个文件格式的具体应用场景和它们各自的优缺点。用户可能是在处理大模型时遇到了这两种文件格式&#xff0c;想了解它们的区别以便正确使用。 首先&#xff…...

OD 算法题 B卷【删除字符串中出现次数最少的字符】

文章目录 删除字符串中出现次数最少的字符 删除字符串中出现次数最少的字符 实现删除字符串中出现次数最少的字符&#xff0c;若&#xff08;最少的&#xff09;有多个字符出现次数一样&#xff0c;则都删除。输出删除后的字符串&#xff0c;其他字符保持原有顺序&#xff1b;…...

如何安装并使用RustDesk

参考&#xff1a; 搭建 RustDesk Server&#xff1a;打造属于自己的远程控制系统&#xff0c;替代 TeamViewer 和 ToDesk&#xff01; 向日葵、ToDesk再见&#xff01;自己动手&#xff0c;自建RustDesk远程服务器真香&#xff01; 通俗易懂&#xff1a;RustDesk Server的搭…...

机器学习——随机森林算法

随机森林算法是一种强大的树集成算法&#xff0c;比使用单个决策树效果要好得多。 以下是生成树集成的方法&#xff1a;假设有一个大小为m的训练集&#xff0c;然后对于b1到B&#xff0c;所以执行B次&#xff0c;可以使用有放回抽样来创建一个大小为m的训练集。所以如果有10个…...

【从零学习JVM|第二篇】字节码文件

前言&#xff1a; 通过了解字节码文件可以帮助我们更容易的理解JVM的工作原理&#xff0c;所以接下来&#xff0c;我们来介绍一下字节码文件。 目录 前言&#xff1a; 正确的打开字节码文件 字节码文件组成 1. 魔数&#xff08;Magic Number&#xff09; 2. 版本号&…...

Fractal Generative Models论文阅读笔记与代码分析

何恺明分型模型这篇文章在二月底上传到arXiv预出版网站到现在已经过了三个月&#xff0c;当时我也听说这篇文章时感觉是大有可为&#xff0c;但是几个月不知道忙啥了&#xff0c;可能错过很多机会&#xff0c;但是亡羊补牢嘛&#xff0c;而且截至目前&#xff0c;该文章应该也还…...

软件测试—学习Day11

今天学习下兼容性 1.App兼容性常见问题 以下是关于 App 兼容性问题的常见举例&#xff0c;涵盖界面展示、操作逻辑、性能差异三大维度&#xff0c;涉及不同系统、设备及网络环境的兼容性场景&#xff1a; 一、界面展示问题 界面展示兼容性问题主要由操作系统版本差异、屏幕…...

OGG-01635 OGG-15149 centos服务器远程抽取AIX oracle11.2.0.4版本

背景描述 有一套ogg远程抽取的环境&#xff0c;源端是AIX7.1环境的oracle 11.2.0.4版本的数据库&#xff0c;中间是OGG抽取服务器&#xff0c;目标端是centos 7.9环境的oracle 19c。 采用集成模式远程抽取源端数据正常&#xff0c;但是经典模式远程抽取源数据的时候抽取进程启…...

Kotlin REPL初探

文章目录 1. Kotlin REPL 简介2. 在命令行中玩Kotlin REPL2.1 下载Kotlin编译器压缩包2.2 安装配置Kotlin编译器2.3 启动Kotlin交互式环境2.4 在命令行玩Kotlin REPL 3. 在IDEA里玩Kotlin REPL3.1 打开Kotlin REPL窗口3.2 在Kotlin REPL窗口玩代码 4. Kotlin REPL 的优势 1. Ko…...

git引用概念(git reference,git ref)(简化对复杂SHA-1哈希值的管理)(分支引用、标签引用、HEAD引用、远程引用、特殊引用)

文章目录 **引用的本质**1. **引用是文件**2. **引用的简化作用** **引用的类型**1. **分支引用&#xff08;Branch References&#xff09;**2. **标签引用&#xff08;Tag References&#xff09;**3. **HEAD 引用**4. **远程引用&#xff08;Remote References&#xff09;*…...

Github 2025-06-07 Rust开源项目日报Top10

根据Github Trendings的统计,今日(2025-06-07统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Rust项目10Dart项目1TypeScript项目1RustDesk: 用Rust编写的开源远程桌面软件 创建周期:1218 天开发语言:Rust, Dart协议类型:GNU Affero Ge…...

gorm 配置数据库

介绍 GORM 是 Go 语言中最流行的 ORM&#xff08;对象关系映射&#xff09;库之一&#xff0c;基于数据库操作的封装&#xff0c;提供类似 Django ORM / SQLAlchemy 的开发体验。 特性描述支持多种数据库MySQL、PostgreSQL、SQLite、SQL Server、ClickHouse 等自动迁移自动根…...

自动化立体仓库堆垛机控制系统STEP7 OB1功能块

1、堆垛机控制系统STEP7硬件组态如下图 CPU CPU 314C-2 PN/DP 6ES7 314-6EH04-0AB0 SM 338 POS-INPUT AO2x12Bit 6ES7 332-5HB01-0AB0 2、堆垛机控制系统STEP7内部变量 前进HMI M 0.0 BOOL 后退HMI M 0.1 BOOL 上升HMI M 0.2 B…...

MATLAB生成大规模无线通信网络拓扑(任意节点数量)

功能&#xff1a; 生成任意节点数量的网络拓扑&#xff0c;符合现实世界节点空间分布和连接规律 效果&#xff1a; 30节点&#xff1a; 100节点&#xff1a; 500节点&#xff1a; 程序&#xff1a; %创建时间&#xff1a;2025年6月8日 %zhouzhichao %自然生长出n节点的网络% …...

ubuntu 20.04挂载固态硬盘

我们有个工控机&#xff0c;其操作系统是ubuntu 20.04。可以接入一个固态硬盘。将固态硬盘插好后&#xff0c;就要进行挂载。在AI的指导下&#xff0c;过程并不顺利。记录如下&#xff1a; 1、检查硬盘是否被识别 安装好硬盘后&#xff0c;运行以下命令来检查Linux系统是否…...

【AI教我写网站-ECG datacenter】

阶段性总结&#xff1a;后端用户管理基础 在项目管理和协作中&#xff0c;清晰地阐述“为什么做”比“怎么做”更能凝聚共识和提供方向。我们不仅要理解技术实现&#xff0c;更要明白其背后的动机和意义。 让我们重新回顾并总结我们到目前为止的工作&#xff0c;这次会更侧重…...

2. Web网络基础 - 协议端口

深入解析协议端口与netstat命令&#xff1a;网络工程师的实战指南 在网络通信中&#xff0c;协议端口是服务访问的门户。本文将全面解析端口概念&#xff0c;并通过netstat命令实战演示如何监控网络连接状态。 一、协议端口核心知识解析 1. 端口号的本质与分类 端口范围类型说…...

PC与Windows远程连接与串流:方案简介(ZeroTier + Parsec、Moonlight + Sunshine、网易UU远程)

简介 在远程办公、云游戏、家用 NAS 串流、图形远程渲染等需求增长的背景下&#xff0c;越来越多用户开始寻找低延迟、高画质、跨网络可用的远程连接方案。今天这篇文章将深度分析三种目前在玩家圈和远程办公中都非常流行的组合方案&#xff1a; &#x1f7e2; ZeroTier Pars…...

SpringBoot+MySQL家政服务平台 设计开发

概述 基于SpringBootMySQL开发的家政服务平台完整项目&#xff0c;该系统实现了用户预约、服务管理、订单统计等核心功能&#xff0c;采用主流技术栈开发&#xff0c;代码规范且易于二次开发。 主要内容 系统功能架构 本系统采用前后端分离架构&#xff0c;前端提供用户交互…...

浏览器兼容-polyfill-本地服务-优化

babel和webpack结合 npx babel src --out-dir dist --presetsbabel/preset-env 这是把src下面的东西都用babel转化一下 webpack可以和babel结合使用&#xff0c;首先下载一个这东西&#xff1a; npm install babel-loader -D webpack配置&#xff1a; const path requir…...

c++ decltype关键字

decltype为类型推导关键字。 示例代码&#xff1a; // decltype也可用于函数模板编程: template<typename T, typename U> auto add(T t, U u) -> decltype(t u) {return t u; }// decltype推导函数返回类型 auto doubleNumFunc(int x) -> decltype(x * 2) {ret…...

分享今天做的力扣SQL题

其实做之前就打算分享的&#xff0c;但是做完又不想分享了。。。结果没几分钟&#xff0c;还是&#xff0c;写一下吧。我就当各位是监督我的。 说一下&#xff0c;这是第一天做SQL题&#xff0c;虽然我也是软件工程专业&#xff0c;但是学的本来就不好&#xff0c;又忘了个差不…...