当前位置: 首页 > news >正文

Python----Python高级(并发编程:协程Coroutines,事件循环,Task对象,协程间通信,协程同步,将协程分布到线程池/进程池中)

一、协程

1.1、协程

协程,Coroutines,也叫作纤程(Fiber)

协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。

当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。

进程与线程是操作系统管理和调度的基本单位,而协程则是由程序员 实现的一种轻量级的、用户空间级别的多任务机制,通常不由操作系统直接提供支持。

1.2、协程的核心(控制流的让出和恢复)

1.每个协程有自己的执行栈,可以保存自己的执行现场

2.可以由用户程序按需创建协程(比如:遇到io操作)

3.协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换到其他协程

4.协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型

1.3、协程的特点

1. 占用资源少:协程通常只需要少量的栈空间,这是因为它们采用协作式的多任务 处理机制,可以在固定的栈空间内通过状态保存和恢复来实现任务的切换,相比 多线程和多进程,协程占用的系统资源更少。

2. 切换开销小:协程的切换是在用户态进行的,不需要进行系统调用,也不涉及内 核态的上下文切换,因此其切换开销非常小,远远低于线程间的上下文切换。

3. 可暂停和可恢复的函数:协程允许函数在执行过程中主动暂停(通常是遇到I/O操 作或其他耗时操作时),并将控制权交还给调度器,以便其他协程可以运行。在 I/O操作或其他耗时操作完成后,该协程可以从暂停的地方继续执行,而不会阻塞 整个线程。这种特性使得协程非常适合于处理I/O密集型任务,可以在等待I/O操 作完成时释放CPU,从而提高程序的并发性能和资源利用率。

1.4、协程的优点

1.由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;

2.无需原子操作的锁定及同步的开销;

3.方便切换控制流,简化编程模型

4.单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)

asyncio协程是写爬虫比较好的方式。比多线程和多进程都好. 开辟新的线程和进程是非常耗时的。

1.5、协程的缺点

1.无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。

1.6、与进程和线程的比较

多进程是在操作系统层面实现的并行执行方式,每个进程拥有独立的内存空间和 系统资源,进程间通过进程间通信(IPC)机制(如管道、消息队列、共享内存 等)进行交互,这增加了通信的复杂性。多进程可以充分利用多核处理器的性 能,实现真正的并行计算。由于进程间的隔离性,系统的安全性和稳定性也得到 了提高。然而,进程间通信和同步的开销相对较高,且每个进程的创建和销毁通 常伴随着较大的资源开销。

多线程是在一个进程内部实现的并发执行方式,多个线程共享该进程的内存空间 和资源,这使得线程间通信和数据共享相对容易。但是,这也引入了线程安全问题,需要通过同步机制(如互斥锁、信号量、条件变量等)来避免数据冲突。多 线程的优点在于能够实现并发执行,但线程间的上下文切换开销相比进程较小, 但相比协程则较大,且需要谨慎处理线程安全问题。 

协程是一种轻量级的线程,与多进程和多线程相比,它具有占用资源少、切换开 销小、可以实现高效异步执行等优点。协程通过非阻塞I/O操作来等待数据,当数 据就绪时自动恢复执行,从而提高了程序的执行效率和响应速度。然而,协程也 有其局限性,它只能在单个线程内执行,因此它对于CPU密集型任务来说并没有 什么好处。 

二、实现协程的方法 

在python中,实现协程的方法有以下几种:

        1. 使用async/await关键字:python3.5及以后出现,到目前为止这是目前最主流的 实现协程的方法。

        2. 使用yield关键字:使用yield关键字及其send方法可以实现协程的效果。

        3. 使用asyncio.coroutine:在python3.4发布之后可以使用该装饰器与yield from 配合来实现协程,不过在python3.8弃用。

        4. 使用第三方库:通过其他的第三方库也可以实现协程,如greenlet。

def consumer():print("消费者准备接收数据。")while True:# 接收生产者发送的数据data = (yield)print("消费者接收到了数据:", data)
def producer(consumer_generator):# 启动生成器,使它准备好接收数据next(consumer_generator)for i in range(5):print("生产者发送数据:", i)# 发送数据给消费者consumer_generator.send(i)# 终止生成器consumer_generator.close()
if __name__ == '__main__':# 创建消费者生成器consumer_coroutine = consumer()# 创建生产者producer(consumer_coroutine)
'''
消费者准备接收数据。
生产者发送数据: 0
消费者接收到了数据: 0
生产者发送数据: 1
消费者接收到了数据: 1
生产者发送数据: 2
消费者接收到了数据: 2
生产者发送数据: 3
消费者接收到了数据: 3
生产者发送数据: 4
消费者接收到了数据: 4
'''

2.1、async

async关键字是Python异步编程的核心组成部分,用于定义协程函数。协程函数与 普通函数不同,它们在调用时不会执行函数里面的代码,而是会返回一个协程对象。

# 定义一个协程函数
async def func():print('123')# 直接调用协程函数会发出警告,并且函数内部的功能也不会执行
func()

想要运行函数体里面的代码,需要进行两个方面的准备:

        1. 获取事件循环。

        2. 将协程对象封装为Task对象并提交到事件循环中。 

2.2、await

        在Python中, await关键字用于挂起(暂停)异步函数的执行,直到被等待的协程 (coroutine)完成。这是异步编程中的一个关键概念,因为它允许程序在等待结果 的同时执行其他任务。

2.2.1、await的基本用法

1. 只能在异步函数内部使用: await关键字只能在一个使用了 异步函数内部使用。它不能在普通的同步函数中使用。

2. 等待协程: async def定义的 await后面通常跟一个协程对象(一个异步函数的调用)。当执行到 await时,当前协程会暂停执行,等待右侧的协程完成。

2.2.2、await的工作原理

1. 挂起与恢复: 当执行到 await时,当前协程会挂起,并让出控制权给事件循环 (event loop)。事件循环可以在这段时间内运行其他协程或处理其他事件。一 旦await后面的协程完成,事件循环会恢复执行原来的协程, 果就是协程的返回值。

2. 非阻塞: 尽管 await表达式的结 await看起来像是同步代码中的阻塞操作,但实际上它是非阻塞 的。这是因为事件循环负责协程之间的切换,从而实现并发。

import asyncio
import time
# 定义一个异步函数say_after,它接受延迟时间和要打印的消息作为参数
async def say_after(delay, what):
# 使用await关键字挂起当前协程,直到指定的延迟时间结束后再继续执行await asyncio.sleep(delay)# 打印消息print(what)
# 定义主异步函数main
async def main():# 记录开始时间print(f"started at {time.strftime('%X')}")# 调用say_after函数,等待1秒后打印'hello'await say_after(1, 'hello')# 调用say_after函数,等待2秒后打印'world'# 注意:这里的执行不是并行的,而是顺序的,因为两个await语句是顺序执行的await say_after(2, 'world')# 记录结束时间print(f"finished at {time.strftime('%X')}")
# 调用asyncio.run()来启动主协程
# 这将创建一个新的事件循环并运行main()直到完成
asyncio.run(main())
'''
started at xx:xx:xx
hello
world
finished at xx:xx:xx
'''

三、事件循环

        事件循环是一种处理程序执行、事件和消息分发的机制。它不断地等待事件的发生, 当事件发生时,事件循环会将其分发给相应的处理程序进行处理。事件循环的核心是 一个循环,它会不断地检查是否有事件需要处理,如果有,就调用相应的回调函数来 处理这些事件。

其工作流程为:

1. 启动:创建并启动事件循环。

2. 注册事件:将各种事件(如网络套接字、文件描述符、定时器等)注册到事件循 环中。

3. 事件循环:进入一个循环,等待事件的发生,并处理这些事件。

4. 执行任务:当事件发生时,事件循环会调用相关的处理函数或恢复相应的协程。 

5. 关闭:当所有任务完成后,关闭事件循环。

事件循环的创建随着Python版本的不同而不同,在Python3.7版本之前,事件循环需 要先使用 asyncio.get_event_loop()来获取循环,然后使用 run_until_complete()来执行任务。在Python3.7及以后的版本,直接使用 asyncio.run()来直接执行任务。

import asyncio
# 定义一个异步函数func1
async def func1():print('start func1')  # 打印信息,表示func1开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():print('start func2')  # 打印信息,表示func2开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func2')  # 打印信息,表示func2执行结束
asyncio.run(func1())
asyncio.run(func2())
'''
start func1
end func1
start func2
end func2
'''

四、Task对象

        Task对象是 asyncio库中的一个实现,它用来在事件循环中安排协程的执行。一个 Task是对协程的一个封装,简单来说,协程本身并不会自动运行,当一个协程被封 装为一个Task对象并提交到事件循环中时,它才会在事件循环中被安排执行。当协程 执行完毕后, Task会提供协程的返回值或异常,并且相比协程对象, 更加丰富的方法供我们使用。

        将协程对象封装为 Task对象拥有 Task是通过asyncio库中的函数进行的,但随着Python版本的不 同,其所用函数也不同。

        在python3.7之前,Task的创建使用的是 asyncio.ensure_future()函数,通过该 函数将使用 async定义的协程函数所返回的协程对象提交到事件循环中。在 python3.7之后,创建Task对象的方法变得更加直接和明确,可以使用 asyncio.create_task()函数来创建,且python3.8版本之后,添加了name参数可 以为任务指定名称。这个函数接受一个协程对象作为参数,并返回一个新的Task对 象。

import asyncio
# 定义一个异步函数func1
async def func1():print('start func1')  # 打印信息,表示func1开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():print('start func2')  # 打印信息,表示func2开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func2')  # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():# 创建任务列表,使用asyncio.create_task来创建任务tasks = [asyncio.create_task(func1()),  # 创建并调度func1作为异步任务asyncio.create_task(func2())# 创建并调度func2作为异步任务]# 使用asyncio.wait等待所有任务完成# asyncio.wait接收一个任务列表,并等待这些任务完成done, pending = await asyncio.wait(tasks)
# 使用 asyncio.run() 来运行主函数
# asyncio.run()是Python 3.7引入的,它会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
# 等同于asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
import asyncio
# 定义一个异步函数func1
async def func1():print('start func1')  # 打印信息,表示func1开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func1')  # 打印信息,表示func1执行结束
# 定义另一个异步函数func2
async def func2():print('start func2')  # 打印信息,表示func2开始执行await asyncio.sleep(2)  # 模拟异步I/O操作,协程在这里挂起2秒钟print('end func2')  # 打印信息,表示func2执行结束
# 定义主异步函数main,它将作为程序的入口点
async def main():# 直接调用asyncio.gather不需要将协程对象先手动封装为Task对象# 该函数会负责将它们作为任务调度到事件循环中# asyncio.gather会返回一个包含所有结果的列表await asyncio.gather(func1(), func2())
# 使用 asyncio.run() 来运行主函数
# asyncio.run()会创建一个新的事件循环,运行传入的协程,并在协程完成后关闭事件循环
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''

五、协程间通信

        与线程相似,协程之间的通信也只有消息队列一种,且拥有不同种类的消息队列。 在python中,协程所使用的消息队列在 asyncio.Queue库下,其中共存在三种类型 的队列,分别为标准的先进先出队列 Queue、先进后出队列 LifoQueue和优先级队 列PriorityQueue。

5.1、Queue 

先进先出的原则

asyncio.Queue(maxsize=0)

maxsize:队列的最大尺寸,如果maxsize小于等于零,则队列尺寸是无限的。如果 是大于0的整数,则当队列达到maxsize时, await put()将阻塞至某个元素被 get()取出。

类方法 :

●        Queue.qsize():返回队列中当前有几条消息。

●        Queue.empty():如果队列为空,返回True,否则返回 False。

●        Queue.full():如果队列已满(达到最大尺寸),返回 True,否则返回 False。

●        Queue.put(item, block=True, timeout=None):将 item 放入队列。如果 block 是True是 None(默认),则在必要时阻塞至有空闲的插槽, 如果timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可 用的插槽,将引发 queue.Full 异常。 

●        Queue.put_nowait(item):相当于 Queue.put(item, block=False)。如果队列已满,立即引发 queue.Full 异常。

●        Queue.get(b1ock=True,timeout=None):从队列中移除并返回一个元素。如果 block是 True 且 timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有项目可用,将引发 queue.Empty 异常。

●        Queue.get_nowait():相当于 Queue.get(block=False)。如果队列为空立即引发 queue.Empty 异常。

 ●       Queue.task_done():指示之前入队的一个任务已经完成。由队列的消费者线程使用。每个Queue. get()调用之后,需要调用 Queue.task_done()告诉队列该任务处理完成。
●        Queue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队列的 时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这个元 素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。

import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):for i in range(1, n + 1):  # 循环从1到n,生成数字print(f'生产者生产了: {i}')  # 打印当前生产的数字await queue.put(i)  # 将数字放入队列,如果队列已满,则阻塞直到有空位await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟print('生产者完成生产。')  # 所有数字生产完毕,打印完成消息await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):while True:  # 无限循环,直到接收到结束信号item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素if item is None:  # 检查是否接收到结束信号queue.task_done()  # 通知队列,当前任务已经完成break  # 如果是结束信号,退出循环print(f'消费者消费了: {item}')  # 打印当前消费的数字queue.task_done()  # 通知队列,当前任务已经完成print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():queue = asyncio.Queue(5)  # 创建一个队列实例,用于生产者和消费者之间的通信# 创建生产者和消费者协程producer_coro = producer(queue, 5)  # 生产者协程,生产1到5的数字consumer_coro = consumer(queue)  # 消费者协程# 使用asyncio.gather等待生产者和消费者协程完成# gather允许同时运行多个协程,并在它们都完成时返回结果await asyncio.gather(producer_coro, consumer_coro)
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
消费者消费了: 1
生产者生产了: 2
消费者消费了: 2
生产者生产了: 3
消费者消费了: 3
生产者生产了: 4
消费者消费了: 4
生产者生产了: 5
消费者消费了: 5
生产者完成生产。
消费者完成消费。
'''

5.2、LifoQueue 

后进先出

asyncio.LifoQueue(maxsize=0)

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。 

常用方法: 

●        LifoQueue.put(item, block=True, timeout=None):将 如果 block 是 True 且 timeout 是 item 放入队列。 None(默认),则在必要时阻塞至有空闲 的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没 有可用的插槽,将引发完全异常。 
●        LifoQueue.put_nowait(item):相当于LifoQueue.put(item,b1ock=False)。如果队列已满,立即引发完全异常。
●        LifoQueue.get(block=True,timeout=None):从队列中移除并返回一个元素。如果 block是True且timeout是 None(默认),则在必要时阻塞至队列中有项目可用。如果 timeout 是正数,将最多阻塞timeout秒,如果在这段时间内没有项目可用,将引发完全异常。 
●        LifoQueue.get_nowait():相当于 LifoQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        LifoQueue.qsize():返回队列中的项目数量
●        LifoQueue.empty():如果队列为空,返回 True,否则返回 False。
●        LifoQueue.full():如果队列已满(达到最大尺寸),返回 True,否则返回False 。

●        LifoQueue.task_done(): 指示之前入队的一个任务已经完成,即 get出来的元素相关操作已经完成。由队列中的 get()端掌控,每次 get用于一个任务时,任务最后要调用 task_done()告诉队列,任务已经完成。

●        LifoQueue.join():阻塞直到队列中的所有元素都被处理完。当元素添加到队 列的时候,未完成任务的计数就会增加,每当消费协程调用 task_done()表示这 个元素已经处理完毕,那么未完成计数就会减少。当未完成计数降到零的时候, join()阻塞被接触。

import asyncio
# 生产者协程,负责生成一系列数字并将它们放入队列中
async def producer(queue, n):for i in range(1, n + 1):  # 循环从1到n,生成数字print(f'生产者生产了: {i}')  # 打印当前生产的数字await queue.put(i)  # 将数字放入队列,如果队列已满,则阻塞直到有空位await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟print('生产者完成生产。')  # 所有数字生产完毕,打印完成消息await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多数字
# 消费者协程,负责从队列中取出数字并打印它们
async def consumer(queue):await asyncio.sleep(5)while True:  # 无限循环,直到接收到结束信号item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素if item is None:  # 检查是否接收到结束信号queue.task_done()  # 通知队列,当前任务已经完成break  # 如果是结束信号,退出循环print(f'消费者消费了: {item}')  # 打印当前消费的数字queue.task_done()  # 通知队列,当前任务已经完成print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():queue = asyncio.LifoQueue(10)  # 创建一个队列实例,用于生产者和消费者之间的通信# 创建生产者和消费者协程producer_coro = producer(queue, 5)  # 生产者协程,生产1到5的数字consumer_coro = consumer(queue)  # 消费者协程# 使用asyncio.gather等待生产者和消费者协程完成# gather允许同时运行多个协程,并在它们都完成时返回结果await asyncio.gather(producer_coro, consumer_coro)# 等待队列中的所有项目都被处理完毕await queue.join()print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: 1
生产者生产了: 2
生产者生产了: 3
生产者生产了: 4
生产者生产了: 5
消费者消费了: 5
消费者消费了: 4
消费者消费了: 3
消费者消费了: 2
消费者消费了: 1
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''

5.3、PriorityQueue

实现优先级队列

asyncio.PriorityQueue(maxsize=0) 

maxsize:队列的最大尺寸。如果设置为小于或等于0的数,则队列的尺寸是无 限的。

常用方法: 

●        PriorityQueue.put((priority, item), block=True, timeout=None):将 item 放入队列,并为其指定一个优先级 timeout 是 priority。如果 block 是 True 且 None(默认),则在必要时阻塞至有空闲的插槽。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在这段时间内没有可用的插槽,将引发 完全异常。
●        PriorityQueue.put_nowait((item, priority):相当于 PriorityQueue.put((item, priority), block=False)。如果队列已满,立 即引发完全异常。
●        PriorityQueue.get(block=True, timeout=None):从队列中移除并返回一 个元素。如果 block 是 True 且 timeout 是 None(默认),则在必要时阻塞 至队列中有项目可用。如果 timeout 是正数,将最多阻塞 timeout 秒,如果在 这段时间内没有项目可用,将引发完全异常。
●        PriorityQueue.get_nowait():相当于 PriorityQueue.get(block=False)。如果队列为空,立即引发完全异常。
●        PriorityQueue.qsize():返回队列中的项目数量。
●        PriorityQueue.empty():如果队列为空,返回True,否则返回False。
●        PriorityQueue.full():如果队列已满(达到最大尺寸),返回True,否则返回 False。 
●        PriorityQueue.task_done():指示之前入队的一个任务已经完成,即 来的元素相关操作已经完成。由队列中的 get()端掌控,每次get用于一个任务 时,任务最后要调用 get出 task_done()告诉队列,任务已经完成。 

import asyncio
# 生产者协程,负责生成一系列数字(这里实际上是字典键值对)并将它们放入队列中
async def producer(queue, n):fx = {4: 'd', 5: 'e', 2: 'b', 3: 'c', 1: 'a'}  # 定义一个字典,包含数字和字母的映射fx_tuples = [(key, value) for key, value in fx.items()]  # 将字典转换为元组列表for i in range(0, n):  # 循环从0到n-1,但由于字典是无序的,这里的i仅用作索引限制# 注意:如果n大于fx_tuples的长度,将会引发IndexErrorprint(f'生产者生产了: {fx_tuples[i]}')  # 打印当前生产的元组await queue.put(fx_tuples[i])  # 将元组放入队列,如果队列已满,则阻塞直到有空位await asyncio.sleep(1)  # 模拟生产耗时,等待1秒钟print('生产者完成生产。')  # 所有指定的元组生产完毕,打印完成消息await queue.put(None)  # 放入一个None作为结束信号,通知消费者没有更多元组
# 消费者协程,负责从队列中取出元组并打印它们
async def consumer(queue):await asyncio.sleep(5)  # 消费者在开始消费前等待5秒(模拟其他任务或延迟)while True:  # 无限循环,直到接收到结束信号item = await queue.get()  # 从队列中取出一个元素,如果队列为空,则阻塞直到有元素if item is None:  # 检查是否接收到结束信号queue.task_done()  # 通知队列,当前任务(即处理None作为结束信号的任务)已经完成break  # 如果是结束信号,退出循环print(f'消费者消费了: {item}')  # 打印当前消费的元组queue.task_done()  # 通知队列,当前任务(即处理一个元组的任务)已经完成print('消费者完成消费。')  # 打印完成消费的消息
# 主协程,负责启动生产者和消费者协程,并等待它们完成
async def main():queue = asyncio.PriorityQueue(10)  # 创建一个优先队列实例,用于生产者和消费者之间的通信,容量为10# 创建生产者和消费者协程producer_coro = producer(queue, 5)  # 生产者协程,尝试生产前5个字典中的键值对(注意字典无序)consumer_coro = consumer(queue)  # 消费者协程# 使用asyncio.gather等待生产者和消费者协程完成# gather允许同时运行多个协程,并在它们都完成时返回结果(这里不关心具体返回值)await asyncio.gather(producer_coro, consumer_coro)# 等待队列中的所有项目都被处理完毕(即等待所有task_done()被调用)await queue.join()print('所有任务都已处理完毕。')
# 运行主协程,启动事件循环
asyncio.run(main())
'''
生产者生产了: (4, 'd')
生产者生产了: (5, 'e')
生产者生产了: (2, 'b')
生产者生产了: (3, 'c')
生产者生产了: (1, 'a')
消费者消费了: (1, 'a')
消费者消费了: (2, 'b')
消费者消费了: (3, 'c')
消费者消费了: (4, 'd')
消费者消费了: (5, 'e')
生产者完成生产。
消费者完成消费。
所有任务都已处理完毕。
'''

六、协程同步

与进程、线程类似,协程也有同步机制,包括Lock、Semaphore、Event、 Condition。

6.1、Lock

在协程中,可以使用Lock来确保同一时间只有一个协程可以访问某个资源。

asyncio.Lock() 

其方法为:

        acquire():获取锁。此方法会等待直至锁为unlocked,然后将其设为locked并 返回True。当有一个以上的协程在 acquire()中被阻塞则会等待解锁,最终只 有一个协程会被执行。锁的获取是公平的,被执行的协程将是第一个开始等待锁 的协程。

        release():释放锁。当锁为locked时,将其设为unlocked并返回。如果锁为 unlocked,则会抛出异常。

        locked():如果锁为locked则返回 True。 

为了避免死锁,建议使用async with 来管理Lock。 

import asyncio  # 导入asyncio模块,提供异步编程的原语和工具
async def worker(lock, id):  # 定义一个协程函数,接收一个锁和一个标识符while True:  # 无限循环,模拟持续工作的协程async with lock:  # 使用async with语句获取锁,确保同一时间只有一个协程执行这部分代码print(f"Worker {id} is working")  # 打印当前协程正在工作的消息await asyncio.sleep(1)  # 模拟I/O操作,挂起协程1秒钟
async def main():  # 定义主协程函数,用于启动和管理其他协程lock = asyncio.Lock()  # 创建一个锁,用于同步协程,防止它们同时执行某些代码块# 创建两个协程,并将它们传递给asyncio.gather()函数# asyncio.gather()用于并发运行多个协程,并等待它们全部完成await asyncio.gather(worker(lock, 1), worker(lock, 2))  # 并发运行两个worker协程
asyncio.run(main())  # 运行主协程,启动事件循环并执行主协程

6.2、Semaphore

        在协程中,可以使用Semaphore来控制对资源的访问数量。Semaphore会管理一个 内部计数器,该计数器会随每次 acquire调用递减并随每次 release调用递增,计 数器的值永远不会降到零以下。当 acquire发现其值为零时,它将保持阻塞直到有某 个任务调用了 release。

asyncio.Semaphore(value=1) 

value:value参数用来为内部计数器赋初始值,默认为1。如果给定的值小于0则会 抛出异常。 

其方法为: 

        acquire():获取一个信号量。如果内部计数器的值大于零,则将其减一并立即 返回True。如果其值为零,则会等待直到 release被调用。

        release():释放一个信号量,将内部计数器的值加一。可以唤醒一个正在等待 获取信号量对象的任务。 

        locked():如果信号量对象无法被立即获取则返回True

建议使用async with来管理Semaphore。 

import asyncio
async def car(semaphore, car_id):"""模拟车辆进入停车场并离开的过程"""print(f"Car {car_id} 正在等待停车位")async with semaphore:  # 获取信号量,相当于获取停车位print(f"Car {car_id} 进入停车场了.")await asyncio.sleep(2)  # 模拟车辆在停车场内停留的时间print(f"Car {car_id} 离开停车场了")
# 信号量在退出async with块时自动释放,相当于车辆离开停车场
async def main():# 假设停车场只有3个停车位parking_spaces = asyncio.Semaphore(3)# 创建5个车辆协程cars = [car(parking_spaces, i) for i in range(1, 6)]# 并发运行所有车辆协程await asyncio.gather(*cars)
asyncio.run(main())
'''
Car 1 正在等待停车位
Car 1 进入停车场了.
Car 2 正在等待停车位
Car 2 进入停车场了.
Car 3 正在等待停车位
Car 3 进入停车场了.
Car 4 正在等待停车位
Car 5 正在等待停车位
Car 1 离开停车场了
Car 2 离开停车场了
Car 3 离开停车场了
Car 4 进入停车场了.
Car 5 进入停车场了.
Car 4 离开停车场了
Car 5 离开停车场了
'''

6.3、Event

        在python中使用Event允许一个协程通知一个或多个协程某个事件已经发生。Event 对象会管理一个内部标志,可通过 set方法将其设为True并通过 设为False。 clear方法将其重 wait方法会阻塞直至该标志被设为True。该标志初始时会被设为 False。

asyncio.Event() 

其方法为: 

        wait():协程等待直至事件被设置。如果事件已被设置,则立即返回True,否则将阻塞直至另一个任务调用set()

        set():设置事件。所有等待事件被设置的任务将被立即唤醒。 

        clear():清空(取消设置)事件。通过wait()进行等待的任务现在将会阻塞直至set()方法被再次调用。

        is_set():如果事件已被设置则返回 True。

import asyncio
import random
async def producer(event, data):"""生产者协程,它在准备好数据后设置事件"""print(f"Producer is preparing data: {data}")time = random.uniform(0.5, 2)print(time)await asyncio.sleep(time)  # 模拟数据准备时间print(f"Producer has prepared data: {data}")event.set()  # 设置事件,表示数据已经准备好了print("Producer has notified the consumer.")
async def consumer(event):"""消费者协程,它在事件被设置后开始消费数据"""print("Consumer is waiting for data.")await event.wait()  # 等待事件被设置print("Consumer has received the notification and is consuming data.")# 模拟数据处理await asyncio.sleep(random.uniform(0.5, 2))print("Consumer has finished consuming data.")
async def main():# 创建一个事件对象event = asyncio.Event()# 创建生产者和消费者协程producer_coro = producer(event, "data1")consumer_coro = consumer(event)# 并发运行生产者和消费者协程await asyncio.gather(producer_coro, consumer_coro)
asyncio.run(main())
'''
Producer is preparing data: data1
1.4196680751620212
Consumer is waiting for data.
Producer has prepared data: data1
Producer has notified the consumer.
Consumer has received the notification and is consuming data.
Consumer has finished consuming data.
'''

6.4、Condition

         在python中允许协程等待某些条件成立,然后被通知恢复执行。在本质上, Condition 对象合并了 Event和 Lock 的功能。 多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访 问。

asyncio.Condition(lock=None) 

        lock:lock参数必须为自己创建的 Lock对象或None,在后一种情况下会自动创建 一个新的Lock对象。

其方法为: 

        acquire():获取下层的锁。此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 True。

        notify(n=1):唤醒最多 n 个正在等待此条件的任务(默认为 1 个)。如果没 有任务正在等待则此方法为空操作。锁必须在此方法被调用前被获取并在随后被 快速释放。 如果通过一个 unlocked 锁调用则会引发异常。

        locked():如果下层的锁已被获取则返回 True。

        notify_all():唤醒所有正在等待此条件的任务。此方法的行为类似于 notify,但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在 随后被快速释放。 如果通过一个 unlocked 锁调用则会引发异常。

        release():释放下层的锁。在未锁定的锁调用时,会引发异常。

        wait():等待直至收到通知。当此方法被调用时如果调用方任务未获得锁,则 会引发异常。这个方法会释放下层的锁,然后保持阻塞直到被 notify()或 notify_all()调用所唤醒。 一旦被唤醒,Condition 会重新获取它的锁并且此 方法将返回 True。

        wait_for(predicate):等待直到目标值变为 true。目标必须为一个可调用对 象,其结果将被解读为一个布尔值。 

建议使用async with来管理Condition。 

import asyncio
# 生产者函数,负责通知所有等待的消费者
async def producer(condition):while True:  # 无限循环,模拟持续的生产活动await condition.acquire()  # 获取条件变量的锁condition.notify_all()  # 通知所有等待的消费者condition.release()  # 释放条件变量的锁await asyncio.sleep(1)  # 暂停一秒,模拟生产活动的时间间隔
# 消费者函数,负责等待生产者的通知
async def consumer(condition, number):while True:  # 无限循环,模拟持续的消费活动await condition.acquire()  # 获取条件变量的锁print(f'{number}正在等待condition')  # 打印消费者正在等待的通知await condition.wait()  # 等待生产者的通知print(f'{number}已释放condition')  # 打印消费者收到通知后的消息condition.release()  # 释放条件变量的锁
# 主函数,负责启动生产者和消费者任务
async def main():condition = asyncio.Condition()  # 创建一个条件变量# 创建任务列表,包括一个生产者和多个消费者tasks = [asyncio.create_task(producer(condition)),  # 创建生产者任务asyncio.create_task(consumer(condition, 1)),  # 创建消费者任务,编号为1asyncio.create_task(consumer(condition, 2)),  # 创建消费者任务,编号为2asyncio.create_task(consumer(condition, 3)),  # 创建消费者任务,编号为3asyncio.create_task(consumer(condition, 4)),  # 创建消费者任务,编号为4asyncio.create_task(consumer(condition, 5)),  # 创建消费者任务,编号为5]# 等待所有任务完成,由于生产者是无限循环,这里实际上会无限等待await asyncio.wait(tasks)
# 运行主函数,启动事件循环
asyncio.run(main())

七、将协程分布到线程池/进程池中

        一般情况下,程序的异步开发要么使用协程,要么使用进程池或线程池,但是也会碰 到有一些情况需要既使用协程又使用进程池或线程池,而进程池、线程池 submit后 返回的 Future和协程的 Future又不是一回事,不能直接使用await,因此就需要进 行一个对象的转换。

        在Python中,可以通过 asyncio.wrap_future()来将一个 concurrent.futures.Future转化为asyncio.Future,这样就可以去使用协程的相关内容了。

import asyncio
import concurrent.futures
import time
# 这是一个普通函数
def func1():time.sleep(5)print('in func1')
# 这是一个普通函数
def func2():time.sleep(3)print('in func2')
async def main():
# 创建一个进程池with concurrent.futures.ProcessPoolExecutor() as pool:# 使用进程池提交任务future1 = pool.submit(func1)future2 = pool.submit(func2)# 将 concurrent.futures.Future 转换为 asyncio.Futureasync_future1 = asyncio.wrap_future(future1)async_future2 = asyncio.wrap_future(future2)# 使用 asyncio 的 await 等待结果result = await asyncio.gather(async_future1,async_future2)print(f"The result is {result}")
# 注意:进程就需要放到主模块中去执行
if __name__ == '__main__':asyncio.run(main())
'''
in func2
in func1
The result is [None, None]
'''

 使用 loop.run_in_executor()直接转换

        使用 asyncio.get_running_loop()时,如果当前没有正在运行的事件循环,就抛 出异常。而上面的 asyncio.get_event_loop()则是在当前没有正在运行的事件循 环的基础上,会创建一个新的事件循环。相对来说, asyncio.get_running_loop()更适合在协程或异步函数内部使用, asyncio.get_event_loop()适用于更广泛的情况。 

loop.run_in_executor(executor, func, *args): 

        executor:一个执行器对象,通常是 concurrent.futures.ThreadPoolExecutor 或 concurrent.futures.ProcessPoolExecutor 的实例。它管理同步函数的执 行,如果不指定就默认创建一个线程池。

        func:要执行的同步函数。

        *args:传递给 func的位置参数。 

import asyncio
import time# 示例同步函数,模拟耗时操作
def slow_function1():# 打印信息,表示函数开始执行print("Function 1 is running")# 模拟耗时操作,线程睡眠2秒time.sleep(2)# 打印信息,表示函数执行完毕print("Function 1 is done")# 返回函数执行结束的信息return 'func1 end'def slow_function2():# 打印信息,表示函数开始执行print("Function 2 is running")# 模拟耗时操作,线程睡眠2秒time.sleep(2)# 打印信息,表示函数执行完毕print("Function 2 is done")# 返回函数执行结束的信息return 'func2 end'async def main():# 获取当前正在运行的事件循环loop = asyncio.get_running_loop()print('before run')# 使用线程池执行器并发运行两个同步函数# run_in_executor的第一个参数为None,表示使用默认的线程池执行器task1 = loop.run_in_executor(None, slow_function1)task2 = loop.run_in_executor(None, slow_function2)print('after run')# 等待两个函数执行完成,并获取它们的返回值result1 = await task1result2 = await task2print('after await')# 打印两个函数的执行结果print(f"Result of function 1: {result1}")print(f"Result of function 2: {result2}")if __name__ == '__main__':# 记录程序开始执行的时间start = time.time()# 运行主函数asyncio.run(main())# asyncio.get_running_loop()# 打印程序执行的总时间print('total_time', time.time() - start)
'''
before run
Function 1 is running
Function 2 is running
after run
Function 1 is done
Function 2 is done
after await
Result of function 1: func1 end
Result of function 2: func2 end
total_time 2.0069408416748047
'''

八、思维导图

相关文章:

Python----Python高级(并发编程:协程Coroutines,事件循环,Task对象,协程间通信,协程同步,将协程分布到线程池/进程池中)

一、协程 1.1、协程 协程,Coroutines,也叫作纤程(Fiber) 协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。 当出现IO阻塞时,…...

什么是可观测性?

现代服务架构常常谈及三个性: 弹性,韧性,可观测性。今天且按下其他两性不表,着重聊一聊可观测性。本文就几个主题对可观测性展开讨论: 可观测性是什么可观测性是必须的吗企业的可观测性落地 可观测性理念 可观测性是…...

3. 【.NET Aspire 从入门到实战】--理论入门与环境搭建--环境搭建

构建现代云原生应用程序时,开发环境的搭建至关重要。NET Aspire 作为一款专为云原生应用设计的开发框架,提供了一整套工具、模板和集成包,旨在简化分布式系统的构建和管理。开始项目初始化之前,确保开发环境的正确配置是成功的第一…...

kubeadm构建k8s源码阅读环境

目标 前面看了minikube的源码了解到其本质是调用了kubeadm来启动k8s集群,并没有达到最初看代码的目的。 所以继续看看kubeadm的代码,看看能否用来方便地构建源码调试环境。 k8s源码编译 kubeadm源码在k8s源码库中,所以要先克隆k8s源码。之…...

【Flink快速入门-1.Flink 简介与环境配置】

Flink 简介与环境配置 实验介绍 在学习一门新的技术之前,我们首先要了解它的历史渊源,也就是说它为什么会出现,它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景,并运行第一个 Flink 程序,对它有…...

硬盘修复后,文件隐身之谜

在数字时代,硬盘作为数据存储的重要载体,承载着无数珍贵的信息与回忆。然而,当硬盘遭遇故障并经过修复后,有时我们会遇到这样一个棘手问题:硬盘修复后,文件却神秘地“隐身”,无法正常显示。这一…...

如何处理网络连接错误导致的fetch失败?

处理由于网络连接错误导致的 fetch 失败通常涉及捕获网络错误并提供适当的用户反馈。以下是如何在 Vue 3 中实现这一点的步骤和示例。 一、更新 useFetch 函数 在 useFetch 函数中,需要捕获网络错误,并设置相应的错误信息。网络错误通常会抛出一个 TypeError,可以根据这个…...

Qt之设置QToolBar上的按钮样式

通常给QAction设置icon后,菜单栏的菜单项和工具栏(QToolBar)上对应的按钮会同时显示该icon。工具栏还可以使用setToolButtonStyle函数设置按钮样式,其参数为枚举值: enum ToolButtonStyle {ToolButtonIconOnly,ToolButtonTextOnly,ToolButtonTextBesideIcon,ToolButtonTe…...

责任链模式(Chain Responsibility)

一、定义:属于行为型设计模式,包含传递的数据、创建处理的抽象和实现、创建链条、将数据传递给顶端节点; 二、UML图 三、实现 1、需要传递处理的数据类 import java.util.Date;/*** 需要处理的数据信息*/ public class RequestData {priva…...

docker安装 mongodb

1、拉取镜像 docker run -dit --name mongo \ -p 17017:27017 \ -e MONGO_INITDB_ROOT_USERNAMEadmin \ -e MONGO_INITDB_ROOT_PASSWORD2018 \ --restartalways \ mongo2、进入容器 docker exec -it mongo bash 3、进入mongo ./bin/mongosh -u admin -p 2018 --authenticat…...

RabbitMQ 从入门到精通:从工作模式到集群部署实战(五)

#作者:闫乾苓 系列前几篇: 《RabbitMQ 从入门到精通:从工作模式到集群部署实战(一)》:link 《RabbitMQ 从入门到精通:从工作模式到集群部署实战(二)》: lin…...

salesforce SF CLI 数据运维经验分享

SF CLI data默认使用bulk api v2, 数据操作效率有了极大的提高。 Bulk api v2的优点: 执行结果可以很直观的从Bulk Data Load Jobs中看到。相较于bulk api v1,只能看到job执行in progress,或者closed的状态,有了很大的改善。执行…...

5.2Internet及其作用

5.2.1Internet概述 Internet称为互联网,又称英特网,始于1969年的美国ARPANET(阿帕网),是全球性的网络。 互连网指的是两个或多个不同类型的网络通过路由器等网络设备连接起来,形成一个更大的网络结构。互连…...

【蓝桥杯—单片机】第十一届省赛真题代码题解题笔记 | 省赛 | 真题 | 代码题 | 刷题 | 笔记

第十一届省赛真题代码部分 前言赛题代码思路笔记竞赛板配置内部振荡器频率设定键盘工作模式跳线扩展方式跳线 建立模板明确设计要求和初始状态显示功能部分数据界面第一部分第二部分第三部分调试时发现的问题 参数设置界面第一部分第二部分和第四部分第三部分和第五部分 按键功…...

数据分析:企业数字化转型的金钥匙

引言:数字化浪潮下的数据金矿 在数字化浪潮席卷全球的背景下,有研究表明,只有不到30%的企业能够充分利用手中掌握的数据,这是否让人深思?数据已然成为企业最为宝贵的资产之一。然而,企业是否真正准备好从数…...

网络工程师 (23)OSI模型层次结构

前言 OSI(Open System Interconnect)模型,即开放式系统互联模型,是一个完整的、完善的宏观模型,它将计算机网络体系结构划分为7层。 OSI七层模型 1. 物理层(Physical Layer) 功能:负…...

DeepSeek添加知识库

1、下载dify 项目地址:https://github.com/langgenius/dify 2、通过docker安装 端口报错 修改端口 .env文件下所有80端口替换成了其它端口 执行正常了 查看 docker容器 <...

2、k8s的cni网络插件和基本操作命令

kube-prxoy属于节点组件&#xff0c;网络代理&#xff0c;实现服务的自动发现和负载均衡。 k8s的内部网络模式 1、pod内的容器于容器之间的通信。 2、一个节点上的pod之间的通信&#xff0c;docker0网桥直接通信。 3、不同节点上的pod之间的通信&#xff1a; 通过物理网卡的…...

Next.js简介:现代 Web 开发的强大框架(ChatGPT-4o回答)

prompt: 你是一位专业的技术博客撰稿人&#xff0c;你将写一篇关于介绍next.js这个开发框架的技术博文&#xff0c;语言是中文&#xff0c;风格专业严谨&#xff0c;用词自然、引人入胜且饶有趣味 在现代 Web 开发的世界中&#xff0c;选择合适的框架可以显著提升开发效率和应用…...

【DeepSeek:国产大模型的崛起与ChatGPT的全面对比】

DeepSeek&#xff1a;国产大模型的崛起与ChatGPT的全面对比 目录 引言DeepSeek的技术架构 2.1 混合专家&#xff08;MoE&#xff09;架构2.2 动态路由机制2.3 训练数据与成本 ChatGPT的技术架构 3.1 Transformer架构3.2 训练数据与成本 性能对比 4.1 推理能力4.2 语言处理4.3…...

input 超出maxlength限制后,输入框变红

一、前言 最近收到产品的一个需求&#xff1a;输入框限制了maxlength“11”&#xff0c;需要在输入第12位时&#xff0c;输入框变红&#xff1b;当然&#xff0c;第12位是不能真正输入到输入框中的。 二、实现难点 其实&#xff0c;单纯的要监听 字母和数字以及字符 还是比较容…...

Docker 构建镜像并搭建私人镜像仓库教程

构建镜像教程 步骤 1&#xff1a;安装 Docker #在安装 Docker 之前&#xff0c;建议先更新系统软件包。 sudo yum update -y # 移除旧的Docker版本和Podman、runc软件包及其相关依赖。 yum remove -y docker docker-client docker-client-latest docker-ce-cli docker-commo…...

doris:MySQL Dump

Doris 在 0.15 之后的版本已经支持通过 mysqldump 工具导出数据或者表结构 使用示例​ 导出​ 导出 test 数据库中的 table1 表&#xff1a;mysqldump -h127.0.0.1 -P9030 -uroot --no-tablespaces --databases test --tables table1 导出 test 数据库中的 table1 表结构&am…...

OpenBMC:通过qemu-system-arm运行编译好的image

OpenBMC&#xff1a;编译_openbmc meson.build file-CSDN博客 讲述了如何编译生成openbmc的image 完成编译后可以通过qemu-system-arm进行模拟加载&#xff0c;以便在没有BMC硬件的情况下进行调试 1.下载qemu-system-arm 在openbmc的上级目录上执行 wget https://jenkins.op…...

STM32的HAL库开发---通用定时器(TIMER)---定时器脉冲计数

一、脉冲计数实验原理 1、 外部时钟模式1&#xff1a;核心为蓝色部分的时基单元&#xff0c;时基单元的时钟源可以来自四种&#xff0c;分别是内部时钟PCLK、外部时钟模式1&#xff0c;外部时钟模式2、内部定时器触发&#xff08;级联&#xff09;。而脉冲计数就是使用外部时钟…...

动态规划LeetCode-121.买卖股票的最佳时机1

给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获取的最大利润。 返回你可以从这笔交易中获取的最大利润。…...

网安三剑客:DNS、CDN、VPN

DNS&#xff08;网络地址转换系统&#xff09;的技术原理与安全应用 1. 网络地址转换系统的基本原理 DNS通过解析用户的访问URL&#xff08;超链接&#xff09;&#xff0c;将其映射到服务器上存储的信息。具体来说&#xff1a; 解析URL&#xff1a;DNS从URL中提取出 hostna…...

Linux在x86环境下制作ARM镜像包

在x86环境下制作ARM镜像包&#xff08;如qemu.docker&#xff09;&#xff0c;可以通过QEMU和Docker的结合来实现。以下是详细的步骤&#xff1a; 安装QEMU-user-static QEMU-user-static是一个静态编译的QEMU二进制文件&#xff0c;用于在非目标架构上运行目标架构的二进制文…...

Vue3+codemirror6实现公式(规则)编辑器

实现截图 实现/带实现功能 插入标签 插入公式 提示补全 公式验证 公式计算 需要的依赖 "codemirror/autocomplete": "^6.18.4","codemirror/lang-javascript": "^6.2.2","codemirror/state": "^6.5.2","cod…...

Lua中文语言编程源码-第十一节,其它小改动汉化过程

__tostring 汉化过程 liolib.c metameth[] {"__转换为字符串", f_tostring}, lauxlib.c luaL_callmeta(L, idx, "__转换为字符串") lua.c luaL_callmeta(L, 1, "__转换为字符串") __len 汉化过程 ltm.c luaT_eventname[] ltablib.c c…...