Python 异步协程:从 async/await 到 asyncio 再到 async with
在 Python 3.8 以后的版本中,异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async
/await
语法和 asyncio
的使用。
从一个简单的场景开始
假设我们在处理一些耗时的 I/O 操作,比如读取多个文件或处理多个数据。为了模拟这种场景,我们先用 time.sleep()
来代表耗时操作:
import time
import randomdef process_item(item):# 模拟耗时操作print(f"处理中:{item}")process_time = random.uniform(0.5, 2.0)time.sleep(process_time)return f"处理完成:{item},耗时 {process_time:.2f} 秒"def process_all_items():items = ["任务A", "任务B", "任务C", "任务D"]results = []for item in items:result = process_item(item)results.append(result)return resultsif __name__ == "__main__":start = time.time()results = process_all_items()end = time.time()print("\n".join(results))print(f"总耗时:{end - start:.2f} 秒")
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 1.28 秒
处理完成:任务C,耗时 0.66 秒
处理完成:任务D,耗时 1.80 秒
总耗时:5.72 秒
这段代码的问题很明显:每个任务都必须等待前一个任务完成才能开始。如果有4个任务,每个任务平均耗时1秒,那么总耗时就接近4秒。
认识 async/await
Python 引入了 async
/await
语法来支持异步编程。当我们在函数定义前加上 async
关键字时,这个函数就变成了一个"协程"(coroutine)。而 await
关键字则用于等待一个协程完成。让我们改写上面的代码:
import asyncio
import random
import timeasync def process_item(item):print(f"处理中:{item}")# async 定义的函数变成了协程process_time = random.uniform(0.5, 2.0)# time.sleep() 换成 asyncio.sleep()await asyncio.sleep(process_time) # await 等待异步操作完成return f"处理完成:{item},耗时 {process_time:.2f} 秒"async def process_all_items():items = ["任务A", "任务B", "任务C", "任务D"]# 创建任务列表tasks = [asyncio.create_task(process_item(item))for item in items]print("开始处理")results = await asyncio.gather(*tasks)return resultsasync def main():start = time.time()results = await process_all_items()end = time.time()print("\n".join(results))print(f"总耗时:{end - start:.2f} 秒")if __name__ == "__main__":asyncio.run(main())
开始处理
处理中:任务A
处理中:任务B
处理中:任务C
处理中:任务D
处理完成:任务A,耗时 1.97 秒
处理完成:任务B,耗时 0.80 秒
处理完成:任务C,耗时 0.83 秒
处理完成:任务D,耗时 1.46 秒
总耗时:1.97 秒
让我们详细解释这段代码的执行过程:
- 当函数被
async
关键字修饰后,调用该函数不会直接执行函数体,而是返回一个协程对象 - await 关键字只能在
async
函数内使用,它表示"等待这个操作完成后再继续" asyncio.create_task()
将协程包装成一个任务,该任务会被事件循环调度执行asyncio.gather()
并发运行多个任务,并等待它们全部完成asyncio.run()
创建事件循环,运行main()
协程,直到它完成
使用 asyncio.wait_for 添加超时控制
在实际应用中,我们往往需要为异步操作设置超时时间:
import asyncio
import random
import timeasync def process_item(item):process_time = random.uniform(0.5, 2.0)try:# 设置1秒超时await asyncio.wait_for(asyncio.sleep(process_time),timeout=1.0)return f"处理完成:{item},耗时 {process_time:.2f} 秒"except asyncio.TimeoutError:return f"处理超时:{item}"async def main():items = ["任务A", "任务B", "任务C", "任务D"]tasks = [asyncio.create_task(process_item(item))for item in items]start = time.time()results = await asyncio.gather(*tasks, return_exceptions=True)end = time.time()print("\n".join(results))print(f"总耗时:{end - start:.2f} 秒")if __name__ == "__main__":asyncio.run(main())
处理超时:任务A
处理完成:任务B,耗时 0.94 秒
处理超时:任务C
处理完成:任务D,耗时 0.78 秒
总耗时:1.00 秒
使用异步上下文管理器
Python 中的 with
语句可以用于资源管理,类似地,异步编程中我们可以使用 async with
。一个类要支持异步上下文管理,需要实现 __aenter__
和 __aexit__
方法:
import asyncio
import randomclass AsyncResource:async def __aenter__(self):# 异步初始化资源print("正在初始化资源...")await asyncio.sleep(0.1)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):# 异步清理资源print("正在清理资源...")await asyncio.sleep(0.1)async def process(self, item):# 异步处理任务print(f"正在处理任务:{item}")process_time = random.uniform(0.5, 2.0)await asyncio.sleep(process_time)return f"处理完成:{item},耗时 {process_time:.2f} 秒"async def main():items = ["任务A", "任务B", "任务C"]async with AsyncResource() as resource:tasks = [asyncio.create_task(resource.process(item))for item in items]results = await asyncio.gather(*tasks)print("\n".join(results))if __name__ == "__main__":asyncio.run(main())
正在初始化资源...
正在处理任务:任务A
正在处理任务:任务B
正在处理任务:任务C
正在清理资源...
处理完成:任务A,耗时 1.31 秒
处理完成:任务B,耗时 0.77 秒
处理完成:任务C,耗时 0.84 秒
使用事件循环执行阻塞操作 run_in_executor
在异步编程中,我们可能会遇到一些无法避免的阻塞操作(比如调用传统的同步API)。这时,asyncio.get_running_loop()
和 run_in_executor
就显得特别重要:
import asyncio
import time
import requests # 一个同步的HTTP客户端库async def blocking_operation():# 获取当前事件循环loop = asyncio.get_running_loop()# 在线程池中执行阻塞操作result = await loop.run_in_executor(None, # 使用默认的线程池执行器requests.get, # 要执行的阻塞函数'http://httpbin.org/delay/1' # 函数参数)return result.status_codeasync def non_blocking_operation():await asyncio.sleep(1)return "非阻塞操作完成"async def main():# 同时执行阻塞和非阻塞操作tasks = [asyncio.create_task(blocking_operation()),asyncio.create_task(non_blocking_operation())]start = time.time()results = await asyncio.gather(*tasks)end = time.time()print(f"操作结果:{results}")print(f"总耗时:{end - start:.2f} 秒")if __name__ == "__main__":asyncio.run(main())
输出:
操作结果:[200, '非阻塞操作完成']
总耗时:1.99 秒
这个例子展示了如何在异步程序中优雅地处理同步操作。如果不使用 run_in_executor
,阻塞操作会阻塞整个事件循环,导致其他任务无法执行:
requests.get()
是同步操作,会阻塞当前线程- 事件循环运行在主线程上
- 如果直接在协程中调用
requests.get()
,整个事件循环都会被阻塞 - 其他任务无法在这期间执行
run_in_executor
会将阻塞操作放到另一个线程中执行- 主线程的事件循环可以继续处理其他任务
- 当线程池中的操作完成时,结果会被返回给事件循环
最佳实践是:
- 尽量使用原生支持异步的库(如
aiohttp
) - 如果必须使用同步库,就用
run_in_executor
- 对于 CPU 密集型任务也可以用
run_in_executor
放到进程池中执行
任务取消:优雅地终止异步操作
有时我们需要取消正在执行的异步任务,比如用户中断操作或超时处理:
import asyncio
import randomasync def long_operation(name):try:print(f"{name} 开始执行")while True: # 模拟一个持续运行的操作await asyncio.sleep(0.5)print(f"{name} 正在执行...")except asyncio.CancelledError:print(f"{name} 被取消了")raise # 重要:继续传播取消信号async def main():# 创建三个任务task1 = asyncio.create_task(long_operation("任务1"))task2 = asyncio.create_task(long_operation("任务2"))task3 = asyncio.create_task(long_operation("任务3"))# 等待1秒后取消task1await asyncio.sleep(1)task1.cancel()# 等待2秒后取消其余任务await asyncio.sleep(1)task2.cancel()task3.cancel()try:# 等待所有任务完成或被取消await asyncio.gather(task1, task2, task3, return_exceptions=True)except asyncio.CancelledError:print("某个任务被取消了")if __name__ == "__main__":asyncio.run(main())
输出:
任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了
这个例子展示了如何正确处理任务取消:
- 任务可以在执行过程中被取消
- 被取消的任务会抛出
CancelledError
- 我们应该适当处理取消信号,确保资源被正确清理
深入理解协程:为什么需要 async/await?
协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,并在之后从暂停的地方继续执行。当我们使用 async
定义一个函数时,我们实际上是在定义一个协程:
import asyncio# 这是一个普通函数
def normal_function():return "Hello"# 这是一个协程
async def coroutine_function():await asyncio.sleep(1)return "Hello"# 让我们看看它们的区别
print(normal_function) # <function normal_function at 0x1052cc040>
print(coroutine_function) # <function coroutine_function at 0x1054b9790># 调用它们的结果不同
print(normal_function()) # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>
await 如何与事件循环协作
协程(Coroutine)的核心在于它可以在执行过程中主动交出控制权,让其他代码有机会执行。让我们通过一个详细的例子来理解这个过程:
import asyncioasync def task1():print("任务1:开始")print("任务1:准备休眠")await asyncio.sleep(2) # 关键点1:交出控制权print("任务1:休眠结束")async def task2():print("任务2:开始")print("任务2:准备休眠")await asyncio.sleep(1) # 关键点2:交出控制权print("任务2:休眠结束")async def main():# 同时执行两个任务await asyncio.gather(task1(), task2())asyncio.run(main())
这段代码的输出会是:
任务1:开始
任务1:准备休眠
任务2:开始
任务2:准备休眠
任务2:休眠结束 # 1秒后
任务1:休眠结束 # 2秒后
让我们详细解释执行过程:
-
当程序遇到
await asyncio.sleep(2)
时:- 这个
sleep
操作被注册到事件循环中 - Python 记录当前的执行位置
- task1 主动交出控制权
- 重要:task1 并没有停止运行,而是被暂停了,等待之后恢复
- 这个
-
事件循环接管控制权后:
- 寻找其他可以执行的协程(这里是 task2)
- 开始执行 task2,直到遇到
await asyncio.sleep(1)
- task2 也交出控制权,被暂停
-
事件循环继续工作:
- 管理一个计时器,追踪这两个 sleep 操作
- 1秒后,发现 task2 的 sleep 时间到了
- 恢复 task2 的执行,打印"任务2:休眠结束"
- 2秒到时,恢复 task1 的执行,打印"任务1:休眠结束"
这就像是一个指挥家(事件循环)在指挥一个管弦乐队(多个协程):
- 当某个乐器(协程)需要休息时,它举手示意(await)
- 指挥家看到后,立即指挥其他乐器演奏
- 当休息时间到了,指挥家会示意这个乐器继续演奏
代码验证:
import asyncio
import timeasync def report_time(name, sleep_time):print(f"{time.strftime('%H:%M:%S')} - {name}开始")await asyncio.sleep(sleep_time)print(f"{time.strftime('%H:%M:%S')} - {name}结束")async def main():# 同时执行多个任务await asyncio.gather(report_time("任务A", 2),report_time("任务B", 1),report_time("任务C", 3))asyncio.run(main())
输出:
00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束
这种机制的优势在于:
- 单线程执行,没有线程切换开销
- 协程主动交出控制权,而不是被操作系统强制切换
- 比起回调地狱,代码更清晰易读
- 错误处理更直观,可以使用普通的
try
/except
理解了这个机制,我们就能更好地使用异步编程:
- 在
await
的时候,其他协程有机会执行 - 耗时操作应该是真正的异步操作(比如
asyncio.sleep
) - 不要在协程中使用阻塞操作,那样会卡住整个事件循环
小结
Python 的异步编程主要依赖以下概念:
async
/await
语法:定义和等待协程asyncio
模块:提供事件循环和任务调度Task
对象:表示待执行的工作单元- 异步上下文管理器:管理异步资源
使用异步编程的关键点:
- I/O 密集型任务最适合使用异步编程
- 所有耗时操作都应该是真正的异步操作
- 注意处理超时和异常情况
- 合理使用
asyncio.gather()
和 asyncio.wait_for()
异步编程不是万能的,但在处理 I/O 密集型任务时确实能带来显著的性能提升。合理使用这些工具,能让我们的程序更高效、更优雅。
相关文章:

Python 异步协程:从 async/await 到 asyncio 再到 async with
在 Python 3.8 以后的版本中,异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async/await 语法和 asyncio 的使用。 从一个简单的场景开始 假设我们在处理一些耗时的 I/O 操作,比如读取多个文件或处理…...

云原生周刊:利用 eBPF 增强 K8s
开源项目推荐 Slurm-operator Slurm-operator 是一个高效可扩展的框架,用于在 K8s 环境中部署和运行 Slurm 工作负载。 它结合了 Slurm 的可靠性和 Kubernetes 的灵活性,支持快速部署 Slurm 集群、动态扩展 HPC 工作负载,并提供高度灵活的定…...

【pycharm】远程服务器之后如何打开终端
【pycharm】远程服务器之后如何打开终端 在pycharm中,我们通过远程连接服务器,此时如果我们需要在终端运行的话,并不能直接在本地终端运行,而是需要连接到服务器终端才能运行命令 设置如下: 输入服务器的ip、端口、…...

从零创建一个 Django 项目
1. 准备环境 在开始之前,确保你的开发环境满足以下要求: 安装了 Python (推荐 3.8 或更高版本)。安装 pip 包管理工具。如果要使用 MySQL 或 PostgreSQL,确保对应的数据库已安装。 创建虚拟环境 在项目目录中创建并激活虚拟环境ÿ…...

无人零售 4G 工业无线路由器赋能自助贩卖机高效运营
工业4G路由器为运营商赋予 “千里眼”,实现对贩卖机销售、库存、设备状态的远程精准监控,便于及时补货与维护;凭借强大的数据实时传输,助力深度洞察销售趋势、优化库存、挖掘商机;还能远程升级、保障交易安全、快速处理…...

使用VSCode Debugger 调试 React项目
一般我们调试代码时,用的最多的应该就是console.log方式了,还有的是使用Chrome DevTools 通过在对应的 sourcemap代码位置打断点进行调试,除了上面两种方式外还有一种更好用的调试方式: VSCode Debugger。 VSCode Debugger可以直…...

[创业之路-199]:《华为战略管理法-DSTE实战体系》- 3 - 价值转移理论与利润区理论
目录 一、价值转移理论 1.1. 什么是价值? 1.2. 什么价值创造 (1)、定义 (2)、影响价值创造的因素 (3)、价值创造的三个过程 (4)、价值创造的实践 (5&…...

AWTK-WEB 快速入门(2) - JS 应用程序
AWTK 可以使用相同的技术栈开发各种平台的应用程序。有时我们需要使用 Web 界面与设备进行交互,本文介绍一下如何使用 JS 语言开发 AWTK-WEB 应用程序。 用 AWTK Designer 新建一个应用程序 先安装 AWTK Designer: https://awtk.zlg.cn/web/index.html…...

dolphinscheduler服务注册中心源码解析(三)RPC提供者服务整合注册中心注册服务实现源码
RPC提供者服务整合注册中心注册服务实现源码 1.概述2.源码解读思路3.实现2.1.应用服务的RPC服务接口定义2.1.1.MasterServer应用中提供的RPC接口服务2.1.2.WorkerServer应用中提供的RPC接口服务2.2.应用服务的RPC服务接口实现2.2.1.MasterServer应用中提供的RPC接口服务实现类2…...

电脑不小心删除了msvcr120.dll文件怎么办?“缺失msvcr120.dll文件”要怎么解决?
一、文件丢失与损坏的常见原因及解决办法 1. 不小心删除系统文件 常见情况:有时在清理电脑垃圾文件时,可能会不小心删除一些重要的系统文件,如msvcr120.dll等。解决办法: 恢复文件:如果刚删除不久,可以尝…...

js 深度克隆
深度克隆(Deep Clone)是指复制一个对象或数组及其所有嵌套结构的副本,使得克隆后的对象与原对象完全独立。JavaScript 提供了一些方法实现深度克隆,但每种方法有其优缺点。 1. 常用方法 1.1 使用 JSON.parse 和 JSON.stringify …...

深度学习之超分辨率算法——FRCNN
– 对之前SRCNN算法的改进 输出层采用转置卷积层放大尺寸,这样可以直接将低分辨率图片输入模型中,解决了输入尺度问题。改变特征维数,使用更小的卷积核和使用更多的映射层。卷积核更小,加入了更多的激活层。共享其中的映射层&…...

软件测试之压力测试【详解】
压力测试 压力测试是一种软件测试,用于验证软件应用程序的稳定性和可靠性。压力测试的目标是在极其沉重的负载条件下测量软件的健壮性和错误处理能力,并确保软件在危急情况下不会崩溃。它甚至可以测试超出正常工作点的测试,并评估软件在极端…...

电脑出现 0x0000007f 蓝屏问题怎么办,参考以下方法尝试解决
电脑蓝屏是让许多用户头疼的问题,其中出现 “0x0000007f” 错误代码更是较为常见且棘手。了解其背后成因并掌握修复方法,能帮我们快速恢复电脑正常运行。 一、可能的硬件原因 内存问题 内存条长时间使用可能出现物理损坏,如金手指氧化、芯片…...

分布式系统架构:限流设计模式
1.为什么要限流? 任何一个系统的运算、存储、网络资源都不是无限的,当系统资源不足以支撑外部超过预期的突发流量时,就应该要有取舍,建立面对超额流量自我保护的机制,而这个机制就是微服务中常说的“限流” 2.四种限流…...

G口带宽服务器与1G独享带宽服务器:深度剖析其差异
在数据洪流涌动的数字化时代,服务器作为数据处理的核心,其性能表现直接关系到业务的流畅度和用户体验的优劣。随着技术的飞速发展,G口带宽服务器与1G独享带宽服务器已成为众多企业的优选方案。然而,这两者之间究竟有何细微差别&am…...

Flamingo:少样本多模态大模型
Flamingo:少样本多模态大模型 论文大纲理解1. 确认目标2. 分析过程(目标-手段分析)3. 实现步骤4. 效果展示5. 金手指 解法拆解全流程核心模式提问Flamingo为什么选择使用"固定数量的64个视觉tokens"这个特定数字?这个数字的选择背…...

推荐一款免费且好用的 国产 NAS 系统 ——FnOS
一、系统基础信息 开发基础:基于最新的Linux内核(Debian发行版)深度开发,兼容主流x86硬件(ARM还没适配),自由组装NAS,灵活扩展外部存储。 使用情况:官方支持功能较多&am…...

2025系统架构师(一考就过):案例题之一:嵌入式架构、大数据架构、ISA
一、嵌入式系统架构 软件脆弱性是软件中存在的弱点(或缺陷),利用它可以危害系统安全策略,导致信息丢失、系统价值和可用性降低。嵌入式系统软件架构通常采用分层架构,它可以将问题分解为一系列相对独立的子问题,局部化在每一层中…...

开机存活脚本
vim datastadard_alive.sh #!/bin/bashPORT18086 # 替换为你想要检查的端口号 dt$(date %Y-%m-%d)# 使用netstat检查端口是否存在 if netstat -tuln | grep -q ":$PORT"; thenecho "$dt Port $PORT is in use" > /opt/datastadard/logs/alive.log# 如…...

车载网关性能 --- GW ECU报文(message)处理机制的技术解析
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 所谓鸡汤,要么蛊惑你认命,要么怂恿你拼命,但都是回避问题的根源,以现象替代逻辑,以情绪代替思考,把消极接受现实的懦弱,伪装成乐观面对不幸的…...

CosyVoice安装过程详解
CosyVoice安装过程详解 安装过程参考官方文档 前情提要 环境:Windows子系统WSL下安装的Ubunt22.4python环境管理:MiniConda3git 1. Clone代码 $ git clone --recursive https://github.com/FunAudioLLM/CosyVoice.git # 若是submodule下载失败&…...

传统网络架构与SDN架构对比
传统网络采用分布式控制,每台设备独立控制且管理耗时耗力,扩展困难,按 OSI 模型分层,成本高、业务部署慢、安全性欠佳且开放性不足。而 SDN 架构将控制平面集中到控制器,数据转发由交换机负责,可统一管理提…...

如何打造用户友好的维护页面:6个创意提升WordPress网站体验
在网站运营中,无论是个人博主还是大型企业网站的管理员,难免会遇到需要维护的情况。无论是服务器迁移、插件更新,还是突发的技术故障,都可能导致网站短暂无法访问。这时,设计维护页面能很好的缓解用户的不满࿰…...

【hackmyvm】Zday靶机wp
HMVrbash绕过no_root_squash静态编译fogproject 1. 基本信息^toc 这里写目录标题 1. 基本信息^toc2. 信息收集2.1. 端口扫描2.2. 目录扫描 3. fog project Rce3.1. ssh绕过限制 4. NFS no_root_squash5. bash运行不了怎么办 靶机链接 https://hackmyvm.eu/machines/machine.ph…...

redis使用注意哪些事项
1. 数据类型选择: • Redis支持多种数据类型,如字符串(String)、哈希(Hash)、列表(List)、集合(Set)、有序集合(Sorted Set)等。在选择…...

步进电机位置速度双环控制实现
步进电机位置速度双环控制实现 野火stm32电机教学 提高部分-第11讲 步进电机位置速度双环控制实现(1)_哔哩哔哩_bilibili PID模型 位置环作为外环,速度环作为内环。设定目标位置和实际转轴位置的位置偏差,经过位置PID获得位置期望,然后讲位置期望(位置变化反映了转轴的速…...

优化程序中的数据:从数组到代数
前言 我们往往都希望优化我们的程序,使之达到一个更好的效果,程序优化的一个重点就是速度,加快速度的一个好办法就是使用并行技术,但是,并行时我们要考虑必须串行执行的任务,也就是有依赖关系的任务&#…...

【电商搜索】CRM: 具有可控条件的检索模型
【电商搜索】CRM: 具有可控条件的检索模型 目录 文章目录 【电商搜索】CRM: 具有可控条件的检索模型目录文章信息摘要研究背景问题与挑战如何解决核心创新点算法模型实验效果(包含重要数据与结论)相关工作后续优化方向 后记 https://arxiv.org/pdf/2412.…...

使用 ffmpeg 拼接合并视频文件
按顺序拼接多个视频文件 1、创建文件清单 创建一个文本文件 filelist.txt,列出所有要合并的视频文件。 格式如下: file path/to/video1.mp4 file path/to/video2.mp4 file path/to/video3.mp42、合并文件 下载FFmpeg,然后使用FFmpeg进行…...