Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置
一、概述
1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。
2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的
3、使用celery是需要使用到中间件的,简单点就使用redis做中间件
注意:
在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。
使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。
二、项目结构
依赖环境:
celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2
目录结构:
flask-project
|--apps
|-- user
|-- models
|--views.py
|--urls.py
|--__init__.py
|--ext
|--__init__.py
|--config.py
|--celery_task
|--__init__.py
|--async_task.py
|--celery.py
|--celeryconfig.py
|--check_task.py
|--scheduler_task.py
app.py
三、flask工厂模式下各模块功能
1、apps/user/models.py : 写了一个user表
2、apps/user/views.py:写了测试调用celery异步任务的接口
3、apps/user/urls.py: 注册路由的
4、ext/__init__.py:cache、db、cors的拓展
5、ext/config.py : cache和cors使用到的配置
6、apps/__init__.py: 一个函数create_app,生成flask应用对象
7、app.py: 启动flask应用对象的模块
本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。
在视图中在执行异步任务,并获取异步任务的id:
from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):def put(self,id):#测试异步发邮件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#测试异步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)
res = 异步函数.delay(函数需要的参数)
task_id = res.id
注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。
四、celery项目的配置
1、celery的配置
将celery的配置都放到一个py文件中,方便后期的维护和使用
celeryconfig.py
from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = ['celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22) # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中}}
}
2、创建celery对象
celery.py
from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、创建celery应用对象'task'可以任务是该celery对象名字,用于区分celery对象broker是指定消息中间件backend是指定任务结果存储位置include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask
注意:
1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。
2、第二步是创建celery通用的方法了,没什么好说的。
3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)
3、异步任务模块
将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。
async_task.py
# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 模拟邮件发送验证码time.sleep(5)return {'result':'邮件已经发送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查询数据,放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查询数据成功'}
4、定时任务模块
将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。
schedulser_task.py
from celery_task.celery import celery
import time# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):print('执行了加法函数',a+b)return a + b# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')
5、检测任务id获取任务状态和返回值
check_task.py:
from celery.result import AsyncResult
from celery_task.celery import celery'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg'] = '任务重新尝试执行'elif result.status == 'FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True) # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic
在视图函数中调用该方法,通过task_id ,返回任务的运行结果。
五、测试
1、运行项目
flask项目(在项目根目录下执行):
flask run --host 0.0.0.0 --port 5000
celery项目(在项目根目录下执行):
启动celery进程:
windows系统:
celery -A celery_task.celery worker -l info -P eventlet
linux系统:
celery -A celery_task.celery worker -l info
启动定时任务(先启动celery进程在启动定时任务):
celery -A celery_task.celery beat -l info
2、运行结果
1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项
1、在系统中要先安装好redis和mysql,并都启动了
2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。
3、当前的配置下,celery的目录必须是在flask根目录下
七、拓展-改变celery_task的位置
如果你想将celery_task包移动到apps包下,此时你需要修改什么?
1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化
'1、加上apps.'
task_module = ['apps.celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法'apps.celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法
]'2、task参数对应的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22) # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中}}
}
3、在视图函数导入异步任务的路径也变了
#异步任务
from apps.celery_task.async_task import send_email_task,cache_user_task
4、启动celery和定时任务的命令变量【在项目根目录下执行命令】
启动celery:
windows启动命令: celery -A apps.celery_task.celery worker -l info -P eventlet
linux启动命令: celery -A apps.celery_task.celery worker -l info
启动定时任务:
celery -A apps.celery_task beat -l info
相关文章:
Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置
一、概述 1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。 2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的 3、使用celery是需要使用到中间件的࿰…...
合并二叉树
题目链接 合并二叉树 题目描述 注意点 如果两个节点重叠,那么将这两个节点的值相加作为合并后节点的新值;否则,不为 null 的节点将直接作为新二叉树的节点 解答思路 先序遍历二叉树,将重叠部分节点值相加作为新节点的值&…...
Sanic——Python函数变成API的神器
今天给大家介绍一个超好用的框架,迅速将Python函数变成API,它就是最近越来越火的异步Web框架Sanic。 1. Sanic简介 Sanic 是 Python3.7 Web 服务器和 Web 框架,旨在提高性能。它允许使用 Python3.5 中添加的async/await语法,这使…...
Windows连接不上VMware,ping不通的问题
文章目录 防火墙问题Windows和虚拟机下的ip不一致导致的问题VMware Network Adapter (适配器)丢失的问题参考文档 防火墙问题 防火墙默认不会拦截ping命令,除非你个人设置了Linux防火墙Centos7的常用命令关闭防火墙 systemctl stop firewalld #停止Windows和虚拟机…...
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...
【MySQL】深入了解索引的底层逻辑结构
文章目录 主键排序一. InnoDB的索引结构1. 单个page2. 多个page 二. 为什么选择B树三. 聚簇索引和非聚簇索引结束语 主键排序 我们创建一个user表,并乱序插入数据 mysql> create table if not exists user(-> id int primary key,-> age int not null,-&…...
Android之SpannableString使用
文章目录 前言一、效果图二、实现代码总结 前言 在开发中,往往有些需求是我们不愿意遇到的,但是也不得不处理的事情,比如一段文案,需要文案中某些文字变颜色或者点击跳转,所以简单写了几句代码实现,没什么…...
【Python】Python求均值、中值和众数
Python求均值、中值和众数 我们将讨论如何使用描述性统计数据进行数据分析,包括: 均值——一组值的平均值; 中值——当所有值按顺序排列时的中间值; 众数——最常出现的值。 以上这些都是集中趋势度量,每种都会产生一个值来表示一组值中的“…...
NPM 常用命令(十二)
目录 1、npm unpublish 1.1 使用语法 1.2 描述 2、npm unstar 2.1 使用语法 3、npm update 3.1 使用语法 3.2 描述 3.3 示例 插入符号依赖 波浪号依赖 低于 1.0.0 的插入符号依赖 子依赖 更新全局安装的包 4、npm version 4.1 使用语法 5、npm view 5.1 使用语…...
数据在内存中的存储(2)
文章目录 3. 浮点型在内存中的存储3.1 一个例子3.2 浮点数存储规则 3. 浮点型在内存中的存储 常见的浮点数: 3.14159 1E10 ------ 1.0 * 10^10 浮点数家族包括: float、double、long double 类型 浮点数表示的范围:float.h中定义 3.1 一个例…...
软件工程与计算总结(十三)详细设计中的模块化与信息隐藏
一.模块化与信息隐藏思想 1.设计质量 好的设计要着重满足以下3方面:可管理性、灵活性、可理解性好的设计需要侧重于间接性和可观察性——简洁性使得系统模块易于管理(理解和分解)、开发(修改与调试)和复用。实践者都…...
RF学习——器件的非线性失真分析
在大信号激励下的射频系统 在电路中,如果激励信号的幅度不可忽视,那么就会产生非线性失真。如二极管,晶体管等电路元件的特性在大信号激励下回变得非线性,输入和输出的形状不同,产生失真。 在功率放大器PA中,随着传输给负载的功率增大而迅速增大,传递功率的规格要始终考…...
SUB-1G SOC芯片DP4306F 32 位 ARM Cortex-M0+内核替代CMT2380F32
DP4306F是一款高性能低功耗的单片集成收发机,集成MO核MCU,工作频率可覆盖200MHiz^ 1000MHz。 支持230/408/433/470/868/915频段。该芯片集成了射频接收器、射频发射器、频率综合器、GFSK调制器、GFSK解调器等功能模块。通过SPI接口可以对输出功率、频道选…...
接收请求地址下载并输出文件流实现
代码: import httpxfrom datetime import datetime from io import BytesIO from fastapi.responses import StreamingResponse@router.get("/download", tags=["下载"]) async...
【iOS】——用单例类封装网络请求
文章目录 一、JSONModel1.JSONModel的简单介绍2.JSONModel的使用 二、单例类和Block传值 一、JSONModel 1.JSONModel的简单介绍 JSONModel一个第三方库,这个库用来将网络请求到的JSON格式的数据转化成Foundation框架下的Model类的属性,这样我们就可以直…...
再学Blazor——概述
简介 Blazor 是一种 .NET 前端 Web 框架,同时支持服务器端呈现和客户端交互性。 使用 C# 语言创建丰富的交互式 UI共享前后端应用逻辑可以生成混合桌面和移动应用受益于 .NET 的性能、可靠性和安全性需要有 HTML、CSS、JS 相关基础(开发 UI 框架的话&a…...
Ceph运维笔记
Ceph运维笔记 一、基本操作 ceph osd tree //查看所有osd情况 其中里面的weight就是CRUSH算法要使用的weight,越大代表之后PG选择该osd的概率就越大 ceph -s //查看整体ceph情况 health_ok才是正常的 ceph osd out osd.1 //将osd.1踢出集群 ceph osd i…...
RTSP协议
1 前言 RTSP协议作为音视频实时监控一个非常重要的协议,具有非常广泛的应用。RTSP由RFC 2326规范化,它允许客户端通过请求不同的媒体资源来控制流媒体服务器。RTSP是一种应用层协议,通常基于TCP连接,用于建立和控制媒体会话。这使…...
Maven系列第6篇:生命周期和插件详解?
maven系列目标:从入门开始开始掌握一个高级开发所需要的maven技能。 这是maven系列第6篇。 整个maven系列的内容前后是有依赖的,如果之前没有接触过maven,建议从第一篇看起,本文尾部有maven完整系列的连接。 前面我们使用maven…...
【通义千问】大模型Qwen GitHub开源工程学习笔记(4)-- 模型的量化与离线部署
摘要: 量化方案基于AutoGPTQ,提供了Int4量化模型,其中包括Qwen-7B-Chat和Qwen-14B-Chat。更新承诺在模型评估效果几乎没有损失的情况下,降低存储要求并提高推理速度。量化是指将模型权重和激活的精度降低以节省存储空间并提高推理速度的过程。AutoGPTQ是一种专有量化工具。…...
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
