Flask + Celery 应用
目录
- Flask + Celery 应用
- 项目结构
- 1. 创建app.py
- 2. 创建tasks.py
- 3. 创建celery_worker.py
- 4. 创建templates目录和index.html
- 运行应用
- 测试文件
Flask + Celery 应用
对于Flask与Celery结合的例子,需要创建几个文件。首先安装必要的依赖:
pip install flask celery redis requests
项目结构
让我们创建以下文件结构:
flask_celery/
├── app.py # Flask应用
├── celery_worker.py # Celery worker
├── tasks.py # Celery任务定义
└── templates/ # 模板目录└── index.html # 主页模板
1. 创建app.py
from flask import Flask, request, render_template, jsonify
from celery_worker import celery
import tasks
import timeapp = Flask(__name__)@app.route('/')
def index():return render_template('index.html')@app.route('/process', methods=['POST'])
def process():# 获取表单数据data = request.form.get('data', '')# 启动异步任务task = tasks.process_data.delay(data)return jsonify({'task_id': task.id,'status': '任务已提交','message': f'正在处理数据: {data}'})@app.route('/status/<task_id>')
def task_status(task_id):# 获取任务状态task = tasks.process_data.AsyncResult(task_id)if task.state == 'PENDING':response = {'state': task.state,'status': '任务等待中...'}elif task.state == 'FAILURE':response = {'state': task.state,'status': '任务失败','error': str(task.info)}else:response = {'state': task.state,'status': '任务完成' if task.state == 'SUCCESS' else '任务处理中','result': task.result if task.state == 'SUCCESS' else None}return jsonify(response)if __name__ == '__main__':app.run(debug=True)
2. 创建tasks.py
from celery_worker import celery
import time@celery.task()
def process_data(data):# 模拟耗时操作time.sleep(5)# 处理数据(这里只是简单地转换为大写)result = data.upper()return {'original_data': data,'processed_data': result,'processing_time': '5秒'}
3. 创建celery_worker.py
from celery import Celery# 创建Celery实例
celery = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0'
)# 配置Celery
celery.conf.update(task_serializer='json',accept_content=['json'],result_serializer='json',timezone='Asia/Shanghai',enable_utc=True,
)
4. 创建templates目录和index.html
首先创建templates目录:
mkdir templates
然后创建index.html文件:
<!DOCTYPE html>
<html>
<head><title>Flask + Celery 示例</title><style>body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }h1 { color: #4285f4; }.container { max-width: 800px; margin: 0 auto; }.form-group { margin-bottom: 15px; }input[type="text"] { padding: 8px; width: 300px; }button { padding: 8px 15px; background-color: #4285f4; color: white; border: none; cursor: pointer; }button:hover { background-color: #3b78e7; }#result { margin-top: 20px; padding: 15px; background-color: #f5f5f5; border-radius: 5px; display: none; }#status { margin-top: 10px; font-style: italic; }</style>
</head>
<body><div class="container"><h1>Flask + Celery 异步任务示例</h1><div class="form-group"><label for="data">输入要处理的数据:</label><br><input type="text" id="data" name="data" placeholder="输入一些文本..."><button onclick="submitTask()">提交任务</button></div><div id="status"></div><div id="result"><h3>任务结果:</h3><pre id="result-data"></pre></div></div><script>function submitTask() {const data = document.getElementById('data').value;if (!data) {alert('请输入数据!');return;}const statusDiv = document.getElementById('status');statusDiv.textContent = '提交任务中...';// 提交任务fetch('/process', {method: 'POST',headers: {'Content-Type': 'application/x-www-form-urlencoded',},body: `data=${encodeURIComponent(data)}`}).then(response => response.json()).then(data => {statusDiv.textContent = data.status;// 开始轮询任务状态pollTaskStatus(data.task_id);}).catch(error => {statusDiv.textContent = `错误: ${error}`;});}function pollTaskStatus(taskId) {const statusDiv = document.getElementById('status');const resultDiv = document.getElementById('result');const resultDataPre = document.getElementById('result-data');// 定期检查任务状态const interval = setInterval(() => {fetch(`/status/${taskId}`).then(response => response.json()).then(data => {statusDiv.textContent = `状态: ${data.status}`;if (data.state === 'SUCCESS') {clearInterval(interval);resultDiv.style.display = 'block';resultDataPre.textContent = JSON.stringify(data.result, null, 2);} else if (data.state === 'FAILURE') {clearInterval(interval);statusDiv.textContent = `错误: ${data.error}`;}}).catch(error => {clearInterval(interval);statusDiv.textContent = `轮询错误: ${error}`;});}, 1000);}</script>
</body>
</html>
运行应用
- 首先,确保Redis服务器正在运行(Celery需要它作为消息代理)可以用docker启动:
docker run --name redis-server -p 6379:6379 -d redis
- 启动Celery worker:
celery -A tasks worker --loglevel=info
- 在另一个终端中启动Flask应用:
python app.py
测试文件
创建一个新文件test_app.py
:
import requests
import time
import json# 应用服务器地址
BASE_URL = 'http://localhost:5000'def test_process_endpoint():"""测试/process端点"""print("测试1: 提交任务处理")# 准备测试数据test_data = "hello world"# 发送POST请求到/process端点response = requests.post(f"{BASE_URL}/process",data={"data": test_data})# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")else:print(f"✗ 状态码错误: {response.status_code}")# 解析响应JSONresult = response.json()# 检查响应内容if 'task_id' in result:print(f"✓ 成功获取任务ID: {result['task_id']}")return result['task_id']else:print("✗ 未能获取任务ID")return Nonedef test_status_endpoint(task_id):"""测试/status/<task_id>端点"""print("\n测试2: 检查任务状态")if not task_id:print("✗ 无法测试状态端点: 缺少任务ID")return# 轮询任务状态,最多等待10秒max_attempts = 10for attempt in range(1, max_attempts + 1):print(f"\n轮询 {attempt}/{max_attempts}...")# 发送GET请求到/status/<task_id>端点response = requests.get(f"{BASE_URL}/status/{task_id}")# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")else:print(f"✗ 状态码错误: {response.status_code}")continue# 解析响应JSONresult = response.json()print(f"当前状态: {result.get('state', '未知')}")# 如果任务完成,显示结果并退出循环if result.get('state') == 'SUCCESS':print("\n✓ 任务成功完成!")print("结果:")print(json.dumps(result.get('result', {}), indent=2, ensure_ascii=False))return True# 如果任务失败,显示错误并退出循环if result.get('state') == 'FAILURE':print(f"\n✗ 任务失败: {result.get('error', '未知错误')}")return False# 等待1秒后再次检查time.sleep(1)print("\n✗ 超时: 任务未在预期时间内完成")return Falsedef test_index_endpoint():"""测试首页端点"""print("\n测试3: 访问首页")# 发送GET请求到/端点response = requests.get(BASE_URL)# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")print("✓ 成功访问首页")else:print(f"✗ 状态码错误: {response.status_code}")def run_all_tests():"""运行所有测试"""print("开始测试Flask+Celery应用...\n")# 测试1: 提交任务task_id = test_process_endpoint()# 测试2: 检查任务状态if task_id:test_status_endpoint(task_id)# 测试3: 访问首页test_index_endpoint()print("\n测试完成!")if __name__ == "__main__":run_all_tests()
然后,在另一个终端中运行测试:
python test_app.py
相关文章:

Flask + Celery 应用
目录 Flask Celery 应用项目结构1. 创建app.py2. 创建tasks.py3. 创建celery_worker.py4. 创建templates目录和index.html运行应用测试文件 Flask Celery 应用 对于Flask与Celery结合的例子,需要创建几个文件。首先安装必要的依赖: pip install flas…...

奥威BI+AI数据分析:企业数智化转型的加速器
在当今数据驱动的时代,企业对于数据分析的需求日益增长。奥威BIAI数据分析的组合,正成为众多企业数智化转型的加速器。 奥威BI以其强大的数据处理和可视化能力著称。它能够轻松接入多种数据源,实现数据的快速整合与清洗。通过内置的ETL工具&…...

python打卡day43
复习日 作业: kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化 进阶:并拆分成多个文件 找了个街头食物图像分类的数据集Popular Street Foods(其实写代码的时候就开始后悔了),原因在于&…...
MySQL 如何判断某个表中是否存在某个字段
在MySQL中,判断某个表中是否存在某个字段,可以通过查询系统数据库 INFORMATION_SCHEMA.COLUMNS 实现。以下是详细步骤和示例: 方法:使用 INFORMATION_SCHEMA.COLUMNS 通过查询系统元数据表 COLUMNS,检查目标字段是否存…...

Linux --进程优先级
概念 什么是进程优先级,为什么需要进程优先级,怎么做到进程优先级这是本文需要解释清楚的。 优先级的本质其实就是排队,为了去争夺有限的资源,比如cpu的调度。cpu资源分配的先后性就是指进程的优先级。优先级高的进程有优先执行的…...

安装和配置 Nginx 和 Mysql —— 一步一步配置 Ubuntu Server 的 NodeJS 服务器详细实录6
前言 昨天更新了四篇博客,我们顺利的 安装了 ubuntu server 服务器,并且配置好了 ssh 免密登录服务器,安装好了 服务器常用软件安装, 配置好了 zsh 和 vim 以及 通过 NVM 安装好Nodejs,还有PNPM包管理工具 。 作为服务器的运行…...
Linux 测试本机与192.168.1.130 主机161/udp端口连通性
Linux 测试本机与 192.168.1.130 主机 161/UDP 端口连通性 161/UDP 端口是 SNMP(简单网络管理协议)的标准端口。以下是多种测试方法: 🛠️ 1. 使用 nmap 进行专业测试(推荐) sudo nmap -sU -p 161 -Pn 1…...
OpenCV 滑动条调整图像亮度
一、知识点 1、int createTrackbar(const String & trackbarname, const String & winname, int * value, int count, TrackbarCallback onChange 0, void * userdata 0); (1)、创建一个滑动条并将其附在指定窗口上。 (2)、参数说明: trackbarname: 创建的…...

图解gpt之注意力机制原理与应用
大家有没有注意到,当序列变长时,比如翻译一篇长文章,或者处理一个长句子,RNN这种编码器就有点力不从心了。它把整个序列信息压缩到一个固定大小的向量里,信息丢失严重,而且很难记住前面的细节,特…...
硬件学习笔记--65 MCU的RAM及FLash简介
MCU(微控制器单元)内部的 RAM 和 Flash 是最关键的两种存储器,它们直接影响MCU的性能、功耗和编程方式。以下是它们的详细讲解及作用: 1. RAM(随机存取存储器) 1.1 特性 1)易失性:…...

【Oracle】视图
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 视图基础概述1.1 视图的概念与特点1.2 视图的工作原理1.3 视图的分类 2. 简单视图2.1 创建简单视图2.1.1 基本简单视图2.1.2 带计算列的简单视图 2.2 简单视图的DML操作2.2.1 通过视图进行INSERT操作2.2.2 通…...
数据库 MongoDB (NoSQL) 与 MySQL (SQL) 的写法对比
MongoDB (NoSQL) 与 MySQL (SQL) 的写法对比及优劣势分析 基本概念差异 MySQL/SQL:关系型数据库,使用结构化查询语言(SQL),数据以表格形式存储,有预定义的模式(schema)MongoDB/NoSQL:文档型数据库,无固定…...
基于粒子滤波的PSK信号解调实现
基于粒子滤波的PSK信号解调实现 一、引言 相移键控(PSK)是数字通信中广泛应用的调制技术。在非高斯噪声和动态相位偏移环境下,传统锁相环(PLL)性能受限。粒子滤波(Particle Filter)作为一种序列蒙特卡洛方法,能有效处理非线性/非高斯系统的状态估计问题。本文将详细阐…...

更强劲,更高效:智源研究院开源轻量级超长视频理解模型Video-XL-2
长视频理解是多模态大模型关键能力之一。尽管OpenAI GPT-4o、Google Gemini等私有模型已在该领域取得显著进展,当前的开源模型在效果、计算开销和运行效率等方面仍存在明显短板。近日,智源研究院联合上海交通大学等机构,正式发布新一代超长视…...

2025.6.3学习日记 Nginx 基本概念 配置 指令 文件
1.初始nginx Nginx(发音为 “engine x”)是一款高性能的开源 Web 服务器软件,同时也具备反向代理、负载均衡、邮件代理等功能。它由俄罗斯工程师 Igor Sysoev 开发,最初用于解决高并发场景下的性能问题,因其轻量级、高…...

【连接器专题】案例:产品测试顺序表解读与应用
在查看SD卡座连接器的规格书,一些测试报告时,你可能会看到如下一张产品测试顺序表。为什么会出现一张测试顺序表呢? 测试顺序表的使用其实定义测试环节的验证的“路线图”和“游戏规则”,本文就以我人个经验带领大家一起看懂这张表并理解其设计逻辑。 测试顺序表结构 测试…...

星动纪元的机器人大模型 VPP,泛化能力效果如何?与 VLA 技术的区别是什么?
点击上方关注 “终端研发部” 设为“星标”,和你一起掌握更多数据库知识 VPP 利用了大量互联网视频数据进行训练,直接学习人类动作,减轻了对于高质量机器人真机数据的依赖,且可在不同人形机器人本体之间自如切换,这有望…...

4000万日订单背后,饿了么再掀即时零售的“效率革命”
当即时零售转向价值深耕,赢面就是综合实力的强弱。 文|郭梦仪 编|王一粟 在硝烟弥漫的外卖行业“三国杀”中,饿了么与淘宝闪购的日订单量竟然突破了4000万单。 而距淘宝闪购正式上线,还不到一个月。 在大额福利优惠…...

入门AJAX——XMLHttpRequest(Get)
一、什么是 AJAX AJAX Asynchronous JavaScript And XML(异步的 JavaScript 和 XML)。 1、XML与异步JS XML: 是一种比较老的前后端数据传输格式(已经几乎被 JSON 代替)。它的格式与HTML类似,通过严格的闭合自定义标…...

5分钟申请edu邮箱【方案本周有效】
这篇文章主要展示的是成果。如果你是第1次看见我的内容,具体的步骤请翻看往期的两篇作品。先看更正补全,再看下一个。 建议你边看边操作。 【更正补全】edu教育申请通过方案 本周 edu教育邮箱注册可行方案 #edu邮箱 伟大无需多言 我已经验证了四个了…...

闲谈PMIC和SBC
今天不卷,简单写点。 在ECU设计里,供电芯片选型是逃不开的话题,所以聊聊PMIC或者SBC的各自特点,小小总结下。 PMIC,全称Power Management Intergrated Circuits,听名字就很专业:电源管理&…...

Java垃圾回收机制深度解析:从理论到实践的全方位指南
Java垃圾回收(GC)是Java虚拟机(JVM)的核心功能,它自动管理内存分配与回收,避免了C/C中常见的内存泄漏问题。本文将深入剖析Java垃圾回收的工作原理、算法实现、收集器类型及调优策略,助你全面掌握JVM内存管理的精髓。 一、垃圾回收基础概念 …...
Ubuntu系统 | 本地部署ollama+deepseek
1、Ollama介绍 Ollama是由Llama开发团队推出的开源项目,旨在为用户提供高效、灵活的本地化大型语言模型(LLM)运行环境。作为Llama系列模型的重要配套工具,Ollama解决了传统云服务对计算资源和网络连接的依赖问题,让用户能够在个人电脑或私有服务器上部署和运行如Llama 3等…...

论文阅读:CLIP:Learning Transferable Visual Models From Natural Language Supervision
从自然语言监督中学习可迁移的视觉模型 虽然有点data/gpu is all you need的味道,但是整体实验和谈论丰富度上还是很多的,也是一篇让我多次想放弃的文章,因为真的是非常长的原文和超级多的实验讨论,隔着屏幕感受到了实验的工作量之…...

在图像分析算法部署中应对流行趋势的变化|文献速递-深度学习医疗AI最新文献
Title 题目 Navigating prevalence shifts in image analysis algorithm deployment 在图像分析算法部署中应对流行趋势的变化 01 文献速递介绍 机器学习(ML)已开始革新成像研究与实践的诸多领域。然而,医学图像分析领域存在显著的转化鸿…...

CAMEL-AI开源自动化任务执行助手OWL一键整合包下载
OWL 是由 CAMEL-AI 团队开发的开源多智能体协作框架,旨在通过动态智能体交互实现复杂任务的自动化处理,在 GAIA 基准测试中以 69.09 分位列开源框架榜首,被誉为“Manus 的开源平替”。我基于当前最新版本制作了免安装一键启动整合包。 CAMEL-…...
Selenium 中 JavaScript 点击的优势及使用场景
*在 Selenium 自动化测试中,使用 JavaScript 执行点击操作(如driver.execute_script("arguments[0].click();", element))相比直接调用element.click()有以下几个主要优势: 1. 绕过元素不可点击的限制 问题场景&#x…...

Linux系统-基本指令(5)
文章目录 mv 指令cat 指令(查看小文件)知识点(简单阐述日志)more 和 less 指令(查看大文件)head 和 tail 指令(跟查看文件有关)知识点(管道)时间相关的指令&a…...
C++ set数据插入、set数据查找、set数据删除、set数据统计、set排序规则、代码练习1、2
set数据插入,代码见下 #include<iostream> #include<set> #include<vector>using namespace std;void printSet(const set<int>& s) {for (set<int>::const_iterator it s.begin(); it ! s.end(); it) {cout << *it <…...
[android]MT6835 Android 指令启动MT6631 wifi操作说明
问题说明 MT6835使用指令启动wifi 使用andorid指令启动 2.4G启动方式 cmd wifi start-softap ctltest wpa2 11111111 -b 2 5G启动指令 cmd wifi start-softap ctltest wpa2 11111111 -b 5 使用linux指令启动 指令启动wifi 新建br-lan brctl addbr br-lan 关闭wifi a…...