Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置
一、概述
1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。
2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的
3、使用celery是需要使用到中间件的,简单点就使用redis做中间件
注意:
在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。
使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。
二、项目结构
依赖环境:
celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2目录结构:
flask-project
|--apps
|-- user
|-- models
|--views.py
|--urls.py
|--__init__.py
|--ext
|--__init__.py
|--config.py
|--celery_task
|--__init__.py
|--async_task.py
|--celery.py
|--celeryconfig.py
|--check_task.py
|--scheduler_task.py
app.py
三、flask工厂模式下各模块功能
1、apps/user/models.py : 写了一个user表
2、apps/user/views.py:写了测试调用celery异步任务的接口
3、apps/user/urls.py: 注册路由的
4、ext/__init__.py:cache、db、cors的拓展
5、ext/config.py : cache和cors使用到的配置
6、apps/__init__.py: 一个函数create_app,生成flask应用对象
7、app.py: 启动flask应用对象的模块
本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。
在视图中在执行异步任务,并获取异步任务的id:
from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):def put(self,id):#测试异步发邮件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#测试异步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)res = 异步函数.delay(函数需要的参数)
task_id = res.id
注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。
四、celery项目的配置
1、celery的配置
将celery的配置都放到一个py文件中,方便后期的维护和使用
celeryconfig.py
from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = ['celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}
2、创建celery对象
celery.py
from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、创建celery应用对象'task'可以任务是该celery对象名字,用于区分celery对象broker是指定消息中间件backend是指定任务结果存储位置include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask
注意:
1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。
2、第二步是创建celery通用的方法了,没什么好说的。
3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)
3、异步任务模块
将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。
async_task.py
# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 模拟邮件发送验证码time.sleep(5)return {'result':'邮件已经发送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查询数据,放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查询数据成功'}4、定时任务模块
将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。
schedulser_task.py
from celery_task.celery import celery
import time# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):print('执行了加法函数',a+b)return a + b# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')5、检测任务id获取任务状态和返回值
check_task.py:
from celery.result import AsyncResult
from celery_task.celery import celery'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg'] = '任务重新尝试执行'elif result.status == 'FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic在视图函数中调用该方法,通过task_id ,返回任务的运行结果。
五、测试
1、运行项目
flask项目(在项目根目录下执行):
flask run --host 0.0.0.0 --port 5000
celery项目(在项目根目录下执行):
启动celery进程:
windows系统:
celery -A celery_task.celery worker -l info -P eventlet
linux系统:
celery -A celery_task.celery worker -l info
启动定时任务(先启动celery进程在启动定时任务):
celery -A celery_task.celery beat -l info
2、运行结果
1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项
1、在系统中要先安装好redis和mysql,并都启动了
2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。
3、当前的配置下,celery的目录必须是在flask根目录下
七、拓展-改变celery_task的位置
如果你想将celery_task包移动到apps包下,此时你需要修改什么?
1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化
'1、加上apps.'
task_module = ['apps.celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'apps.celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]'2、task参数对应的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}3、在视图函数导入异步任务的路径也变了
#异步任务
from apps.celery_task.async_task import send_email_task,cache_user_task4、启动celery和定时任务的命令变量【在项目根目录下执行命令】
启动celery:
windows启动命令: celery -A apps.celery_task.celery worker -l info -P eventlet
linux启动命令: celery -A apps.celery_task.celery worker -l info
启动定时任务:
celery -A apps.celery_task beat -l info
相关文章:
 
Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置
一、概述 1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。 2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的 3、使用celery是需要使用到中间件的࿰…...
 
合并二叉树
题目链接 合并二叉树 题目描述 注意点 如果两个节点重叠,那么将这两个节点的值相加作为合并后节点的新值;否则,不为 null 的节点将直接作为新二叉树的节点 解答思路 先序遍历二叉树,将重叠部分节点值相加作为新节点的值&…...
 
Sanic——Python函数变成API的神器
今天给大家介绍一个超好用的框架,迅速将Python函数变成API,它就是最近越来越火的异步Web框架Sanic。 1. Sanic简介 Sanic 是 Python3.7 Web 服务器和 Web 框架,旨在提高性能。它允许使用 Python3.5 中添加的async/await语法,这使…...
 
Windows连接不上VMware,ping不通的问题
文章目录 防火墙问题Windows和虚拟机下的ip不一致导致的问题VMware Network Adapter (适配器)丢失的问题参考文档 防火墙问题 防火墙默认不会拦截ping命令,除非你个人设置了Linux防火墙Centos7的常用命令关闭防火墙 systemctl stop firewalld #停止Windows和虚拟机…...
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...
 
【MySQL】深入了解索引的底层逻辑结构
文章目录 主键排序一. InnoDB的索引结构1. 单个page2. 多个page 二. 为什么选择B树三. 聚簇索引和非聚簇索引结束语 主键排序 我们创建一个user表,并乱序插入数据 mysql> create table if not exists user(-> id int primary key,-> age int not null,-&…...
 
Android之SpannableString使用
文章目录 前言一、效果图二、实现代码总结 前言 在开发中,往往有些需求是我们不愿意遇到的,但是也不得不处理的事情,比如一段文案,需要文案中某些文字变颜色或者点击跳转,所以简单写了几句代码实现,没什么…...
【Python】Python求均值、中值和众数
Python求均值、中值和众数 我们将讨论如何使用描述性统计数据进行数据分析,包括: 均值——一组值的平均值; 中值——当所有值按顺序排列时的中间值; 众数——最常出现的值。 以上这些都是集中趋势度量,每种都会产生一个值来表示一组值中的“…...
 
NPM 常用命令(十二)
目录 1、npm unpublish 1.1 使用语法 1.2 描述 2、npm unstar 2.1 使用语法 3、npm update 3.1 使用语法 3.2 描述 3.3 示例 插入符号依赖 波浪号依赖 低于 1.0.0 的插入符号依赖 子依赖 更新全局安装的包 4、npm version 4.1 使用语法 5、npm view 5.1 使用语…...
 
数据在内存中的存储(2)
文章目录 3. 浮点型在内存中的存储3.1 一个例子3.2 浮点数存储规则 3. 浮点型在内存中的存储 常见的浮点数: 3.14159 1E10 ------ 1.0 * 10^10 浮点数家族包括: float、double、long double 类型 浮点数表示的范围:float.h中定义 3.1 一个例…...
 
软件工程与计算总结(十三)详细设计中的模块化与信息隐藏
一.模块化与信息隐藏思想 1.设计质量 好的设计要着重满足以下3方面:可管理性、灵活性、可理解性好的设计需要侧重于间接性和可观察性——简洁性使得系统模块易于管理(理解和分解)、开发(修改与调试)和复用。实践者都…...
RF学习——器件的非线性失真分析
在大信号激励下的射频系统 在电路中,如果激励信号的幅度不可忽视,那么就会产生非线性失真。如二极管,晶体管等电路元件的特性在大信号激励下回变得非线性,输入和输出的形状不同,产生失真。 在功率放大器PA中,随着传输给负载的功率增大而迅速增大,传递功率的规格要始终考…...
 
SUB-1G SOC芯片DP4306F 32 位 ARM Cortex-M0+内核替代CMT2380F32
DP4306F是一款高性能低功耗的单片集成收发机,集成MO核MCU,工作频率可覆盖200MHiz^ 1000MHz。 支持230/408/433/470/868/915频段。该芯片集成了射频接收器、射频发射器、频率综合器、GFSK调制器、GFSK解调器等功能模块。通过SPI接口可以对输出功率、频道选…...
接收请求地址下载并输出文件流实现
代码: import httpxfrom datetime import datetime from io import BytesIO from fastapi.responses import StreamingResponse@router.get("/download", tags=["下载"]) async...
 
【iOS】——用单例类封装网络请求
文章目录 一、JSONModel1.JSONModel的简单介绍2.JSONModel的使用 二、单例类和Block传值 一、JSONModel 1.JSONModel的简单介绍 JSONModel一个第三方库,这个库用来将网络请求到的JSON格式的数据转化成Foundation框架下的Model类的属性,这样我们就可以直…...
再学Blazor——概述
简介 Blazor 是一种 .NET 前端 Web 框架,同时支持服务器端呈现和客户端交互性。 使用 C# 语言创建丰富的交互式 UI共享前后端应用逻辑可以生成混合桌面和移动应用受益于 .NET 的性能、可靠性和安全性需要有 HTML、CSS、JS 相关基础(开发 UI 框架的话&a…...
 
Ceph运维笔记
Ceph运维笔记 一、基本操作 ceph osd tree //查看所有osd情况 其中里面的weight就是CRUSH算法要使用的weight,越大代表之后PG选择该osd的概率就越大 ceph -s //查看整体ceph情况 health_ok才是正常的 ceph osd out osd.1 //将osd.1踢出集群 ceph osd i…...
 
RTSP协议
1 前言 RTSP协议作为音视频实时监控一个非常重要的协议,具有非常广泛的应用。RTSP由RFC 2326规范化,它允许客户端通过请求不同的媒体资源来控制流媒体服务器。RTSP是一种应用层协议,通常基于TCP连接,用于建立和控制媒体会话。这使…...
 
Maven系列第6篇:生命周期和插件详解?
maven系列目标:从入门开始开始掌握一个高级开发所需要的maven技能。 这是maven系列第6篇。 整个maven系列的内容前后是有依赖的,如果之前没有接触过maven,建议从第一篇看起,本文尾部有maven完整系列的连接。 前面我们使用maven…...
 
【通义千问】大模型Qwen GitHub开源工程学习笔记(4)-- 模型的量化与离线部署
摘要: 量化方案基于AutoGPTQ,提供了Int4量化模型,其中包括Qwen-7B-Chat和Qwen-14B-Chat。更新承诺在模型评估效果几乎没有损失的情况下,降低存储要求并提高推理速度。量化是指将模型权重和激活的精度降低以节省存储空间并提高推理速度的过程。AutoGPTQ是一种专有量化工具。…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
 
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
 
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
 
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
 
无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
 
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
比较数据迁移后MySQL数据库和OceanBase数据仓库中的表
设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...
