Flask使用线程异步执行耗时任务
1 问题说明
1.1 任务简述
在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。
解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。
经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。
注意:使用线程会出现线程安全问题,
1.2 注意的问题
(1)线程安全
"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""
(2)使用数据库
# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""
2 工程布局
项目布局如下:

3 源代码
(1)main.py
from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db# 初始化MySQL
init_mysql_db()init_blueprint()if __name__ == '__main__':app.run(host='0.0.0.0', debug=True)
(2)config_app.py
from flask import Flask
from flask_cors import CORSdef create_app():flask_app = Flask(__name__)CORS(flask_app, supports_credentials=True)return flask_appapp = create_app()
(3)config_db.py
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow# 添加pymysql驱动,连接MySQL数据库
import pymysqlfrom config_app import apppymysql.install_as_MySQLdb()"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""# 创建MySQL单实例
mysql_db = SQLAlchemy()# 创建Schema
mysql_schema = Marshmallow()# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""class MysqlConf:acc = "root"pwd = "123456"host = "192.168.108.200"port = 3306db = "async"mysql_conf = MysqlConf()# 初始化MySQL数据库
def init_mysql_db():# 配置MySQL数据库urldb_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.dbapp.config["SQLALCHEMY_DATABASE_URI"] = db_url# 关闭sqlalchemy自动跟踪数据库app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 显示底层执行的SQL语句app.config['SQLALCHEMY_ECHO'] = True# 解决‘No application found. Either work inside a view function or push an application context.’app.app_context().push()# 初始化appmysql_db.init_app(app)# 初始化schemamysql_schema.init_app(app)# 初始化table
def init_table():# 删除表mysql_db.drop_all()# 创建表mysql_db.create_all()
(4)dao.py
import datetimefrom config_db import mysql_db as db# Create table of bm_record
class User(db.Model):# Record table__tablename__ = "as_user"id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)name = db.Column("us_name", db.String(100))age = db.Column("us_age", db.Integer)create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)# 插入数据
def insert_record_dao(user: User):# Add datadb.session.add(user)# Commit datadb.session.commit()
(5)blueprint.py
# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutorfrom flask import Blueprint, jsonifyfrom config_app import app
from config_db import init_table
from dao import insert_record_dao, Useruser = Blueprint("user", __name__)# 注册蓝本
def init_blueprint():app.register_blueprint(user, url_prefix='/user')@user.route("/initdb")
def init_db():init_table()return jsonify("success")@user.route("/add")
def add_user():user: User = User()user.name = "zhangsan"user.age = 12time.sleep(10)insert_record_dao(user)return jsonify(user.id)executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():executor.submit(_run_thread_pool)return jsonify("success")# 执行线程
def _run_thread_pool():print("Run thread pool")time.sleep(2)user: User = User()user.name = "thread pool"user.age = 12# 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错with app.app_context():insert_record_dao(user)print("End thread pool")pass@user.route("/thread")
def run_task_thread():task = threading.Thread(target=_run_thread, name='thread-01')task.start()return jsonify("success")# 执行线程
def _run_thread():print("Run thread")time.sleep(2)user: User = User()user.name = "thread"user.age = 12# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):"""This typically means that you attempted to use functionality that neededthe current application. To solve this, set up an application contextwith app.app_context(). See the documentation for more information."""with app.app_context():insert_record_dao(user)print("End thread")pass
4 请求截图
(1)初始化数据库

(2)添加用户

(3)使用线程池

(4)使用线程

相关文章:
Flask使用线程异步执行耗时任务
1 问题说明 1.1 任务简述 在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。 解决异步问题有…...
zabbix监控nginx
zabbix是什么 web界面提供的一种可视化的监控服务软件 以分布式的方式系统监控以及网络监控,硬件监控等等开源的软件 zabbix的架构 1、c/s模式 客户端和服务端,zabbix server服务端 zabbix agent 客户端 2、通过B/S B是浏览器 S服务端,通…...
【CVE-2023-49103】ownCloud graphapi信息泄露漏洞(2023年11月发布)
漏洞简介 ownCloud owncloud/graphapi 0.2.x在0.2.1之前和0.3.x在0.3.1之前存在漏洞。graphapi应用程序依赖于提供URL的第三方GetPhpInfo.php库。当访问此URL时,会显示PHP环境的配置详细信息(phpinfo)。此信息包括Web服务器的所有环境变量&a…...
可视化数据库管理客户端:Adminer
简介:Adminer(前身为phpMinAdmin)是一个用PHP编写的功能齐全的数据库管理工具。与phpMyAdmin相反,它由一个可以部署到目标服务器的文件组成。Adminer可用于MySQL、PostgreSQL、SQLite、MS SQL、Oracle、Firebird、SimpleDB、Elast…...
Python----字典练习
相关链接:Python---字典的增、删、改、查操作_python中字典的增删改查-CSDN博客 Python---字典---dict-CSDN博客 Python---引用变量与可变、非可变类型-CSDN博客 重点: 字典中的 key (就是键)可以是很多数据类型(…...
CentOS 部署 WBO 在线协作白板
1)WBO 白板工具介绍 1.1)WBO 白板简介 WBO 是一个自由和开源的在线协作白板。它允许多个用户同时在一个虚拟的大型白板上画图。该白板对所有线上用户实时更新,并且状态始终保持。它可以用于许多不同的目的,包括艺术、娱乐、设计和…...
qt-C++笔记之QStringList
qt-C笔记之QStringList —— 杭州 2023-12-03 文章目录 qt-C笔记之QStringList1.1.《Qt官方文档》第一部分翻译:继承自QList\<QString\>-初始化-添加字符串1.2.迭代字符串1.3.join()和split()1.4.filter()1.5.lastIndexOf()1.6.indexOf()1.7.replaceInString…...
ply前端
ply 是 eBPF 的 front-end 前端工具之一,专为 embedded Linux systems 开发,采用 C 语言编写,只需 libc 和内核支持 BPF 就可以运行,不需要外部 kernel 模块,不需要 LLVM,不需要 python。 ply 由瑞典工程师…...
U盘不仅能在电脑上使用,在手机上也可使用,包括安卓和苹果手机,但苹果的较特殊
许多最好的安卓手机都使用USB-C端口在电脑上充电和来回传输文件,但如果你需要给老板发电子邮件的文件放在闪存驱动器或全尺寸SD卡上呢? 幸运的是,使用廉价的适配器电缆,你可以将USB加密狗或读卡器直接连接到手机上。你甚至可以直接使用USB-C闪存驱动器,以实现更轻松的过程…...
面试数据库八股文十问十答第二期
面试数据库八股文十问十答第二期 作者:程序员小白条,个人博客 相信看了本文后,对你的面试是有一定帮助的! ⭐点赞⭐收藏⭐不迷路!⭐ 1.MySQL的主从复制 MySQL的主从复制是什么?MySQL主从复制是一种常见的…...
【LeetCode】每日一题 2023_12_2 拼车(模拟/差分)
文章目录 刷题前唠嗑题目:拼车题目描述代码与解题思路学习大佬题解 刷题前唠嗑 LeetCode?启动!!! 题目:拼车 题目链接:1094. 拼车 题目描述 代码与解题思路 func carPooling(trips [][]int…...
网络和Linux网络_7(传输层)UDP和TCP协议(端口号+确认应答+超时重传+三次握手四次挥手)
目录 1. 重看端口号 1.1 端口号的概念 1.2 端口号的划分 2. 重看UDP协议 2.1 UDP协议格式 2.2 UDP的特点 3. 重看TCP协议 3.1 TCP协议格式 3.2 TCP的解包分用 3.3 TCP的可靠性及机制 3.3.1 确认应答ACK机制 3.3.2 超时重传机制 3.3.3 连接管理机制(三次…...
KALI LINUX安全审核
预计更新 第一章 入门 1.1 什么是Kali Linux? 1.2 安装Kali Linux 1.3 Kali Linux桌面环境介绍 1.4 基本命令和工具 第二章 信息收集 1.1 网络扫描 1.2 端口扫描 1.3 漏洞扫描 1.4 社交工程学 第三章 攻击和渗透测试 1.1 密码破解 1.2 暴力破解 1.3 漏洞利用 1.4 …...
2023-12-03-解决libxkbcommon库编译完后图像界面不能使用键盘
layout: post # 使用的布局(不需要改) title: Ubuntu修复 # 标题 subtitle: 解决libxkbcommon库编译完图形界面不能使用键盘 #副标题 date: 2023-12-03 # 时间 author: BY ThreeStones1029 # 作者 header-img: img/about_bg.jpg #这篇文章标题背景图片 c…...
vue el-table表格中每行上传文件(上传简历)操作
1、HTML中 <el-table :data"formInfo.userListDto" border stripe max-height"400"><el-table-column type"index" label"序号" width"50"> </el-table-column><el-table-column prop"realName&q…...
Python批量图像处理--图片重命名、图片旋转
图像批量重命名: 使用batch_rename_images函数实现对多个文件夹下面的图片进行重命名操作 先检查文件名的后缀,使用了.endswith()方法来判断文件名是否以.jpg、.png或.JPG结尾,判断是否为图片文件 然后构造新的文件路径new_filepath&#…...
第五天 用Python批量处理Excel文件,实现自动化办公
用Python批量处理Excel文件,实现自动化办公 一、具体需求 有以下N个表,每个表的结构一样,如下: 需要把所有表数据汇总,把每个人的得分、积分分别加起来,然后按总积分排名,总积分一致时ÿ…...
mybatis整合(手动添加jar包方式)
操作步骤 创建数据库 建立user表 放入数据 1、创建javaweb工程并添加Jar包 用到的jar包 junit 用于测试 mybatis框架:mybatis-3.5.9.jar mysql数据库:mysql-connector-java-8.0.28.jar 2、添加MyBatis核心配置文件 <?xml version"1.0"…...
leetcode - 矩阵区域和
1314. 矩阵区域和 - 力扣(LeetCode) 给你一个 m x n 的矩阵 mat 和一个整数 k ,请你返回一个矩阵 answer ,其中每个 answer[i][j] 是所有满足下述条件的元素 mat[r][c] 的和: i - k < r < i k, j - k < c …...
头歌JUnit单元测试相关实验进阶
JUnit是一个由 Erich Gamma 和 Kent Beck 编写的一个回归测试框架(regression testing framework),主要供 Java 开发人员编写单元测试。Junit在极限编程和重构中被极力推荐使用,因为它可以大大地提高开发的效率。 Junit的特性&…...
7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
