python爬虫之线程与多进程知识点记录
一、线程
1、概念
-
线程
在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程
是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程
线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源
线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间
-
多线程
是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理(Chip-level multithreading)或同时多线程(Simultaneous multithreading)处理器。
-
主线程:
任何进程都会有一个默认的主线程 如果主线程死掉 子线也程也死掉 所以 子线程依赖于主线程
-
GIL
其他语言,CPU 是多核是支持多个线程同时执行。但在 Python 中,无论是单核还是多核,同时只能由一个线程在执行。其根源是 GIL 的存在。
GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。
并且由于 GIL 锁存在,Python 里一个进程永远只能同时执行一个线程(拿到 GIL 的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
-
模块
_thread模块:低级模块
threading模块:高级模块,对_thread进行了封装
2、使用_thread 模块 去创建线程
-
导入模块
import _thread
-
开启线程
_thread.start_new_thread(函数名,参数)
-
注意:
-
参数必须为元组类型
-
如果主线程执行完毕 子线程就会死掉
-
如果线程不需要传参数的时候 也必须传递一个空元组占位
-
-
实例
import win32api import _thread #引入线程的模块 比较老的模块 新的 threading def run(i):win32api.MessageBox(0,"您的{}号大宝贝上线了".format(i),"来自凤姐以及陆源凯的问候",2) for i in range(5):_thread.start_new_thread(run,(i,)) #发起多个线程 传参的情况 参数为元组# _thread.start_new_thread(run,()) #发起多个线程 不传参 页需要俩个参数 第二个为空元组 print('会先执行我') #如果主线程 不死 那么 所有的次线程 就都会正常执行 while True:pass提高效率
import _thread import time def run():for i in range(10):print(i,'------------')time.sleep(1) """ for i in range(5): #50秒run() """ for i in range(5):_thread.start_new_thread(run,()) #发起五个线程去执行 时间大大缩短 for i in range(10): #循环10秒 计算 线程执行完毕所需要的时间 类似与一个劫停time.sleep(1) print('xxxx')
3、threading创建线程
-
导入模块
import threading
-
threading创建线程的方式
myThread = threading.Thread(target=函数名[,args=(参数,),name="你指定的线程名称"])
参数
-
target:指定线程执行的函数
-
name:指定当前线程的名称
-
args:传递个子线程的参数 ,(元组形式)
-
-
开启线程
myThread.start()
-
线程等待
myThread.join()
-
返回当前线程对象
-
threading.current_thread()
-
threading.currentThread()
-
-
获取当前线程的名称
-
threading.current_thread().name
-
threading.currentThread().getName()
-
-
设置线程名
setName()
Thread(target=fun).setName('name') -
返回主线程对象
threading.main_thread()
-
获取当前活着的所有线程总数,包括主线程main
threading.active_count() 或 threading.activeCount()
-
判断线程是不是活的,即线程是否已经结束
-
Thread.is_alive()
-
Thread.isAlive()
-
-
线程守护
设置子线程是否随主线程一起结束
有一个布尔值的参数,默认为False,该方法设置子线程是否随主线程一起结束 True一起结束
Thread.setDaemon(True)
还有个要特别注意的:必须在start() 方法调用之前设置
if __name__ == '__main__':t = Thread(target=fun, args=(1,))t.setDaemon(True)t.start()print('over') -
获取当前所有的线程名称
threading.enumerate() # 返回当前包含所有线程的列表
4、启动线程实现多任务
import time
import threading
def run1():# 获取线程名字print("启动%s子线程……"%(threading.current_thread().name))for i in range(5):print("lucky is a good man")time.sleep(1)
def run2(name, word):print("启动%s子线程……" % (threading.current_thread().name))for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1)
if __name__ == "__main__":t1 = time.clock()# 主进程中默认有一个线程,称为主线程(父线程)# 主线程一般作为调度而存在,不具体实现业务逻辑
# 创建子线程# name参数可以设置线程的名称,如果不设置按顺序设置为Thread-nth1 = threading.Thread(target=run1, name="th1")th2 = threading.Thread(target=run2, args=("lucky", "nice"))
#启动th1.start()th2.start()
#等待子线程结束th1.join()th2.join()
t2 = time.clock()print("耗时:%.2f"%(t2-t1))
5、线程间共享数据
概述
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在每个进程中,互不影响。而多线程中,所有变量都由所有线程共享。所以,任何一个变量都可以被任意一个线程修改,因此,线程之间共享数
据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
import time
import threading
money = 0
def run1():global moneymoney = 1print("run1-----------", money)print("启动%s子线程……"%(threading.current_thread().name))for i in range(5):print("lucky is a good man")time.sleep(1)
def run2(name, word):print("run2-----------", money)print("启动%s子线程……" % (threading.current_thread().name))for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1)
if __name__ == "__main__":t1 = time.clock()
th1 = threading.Thread(target=run1, name="th1")th2 = threading.Thread(target=run2, args=("lucky", "nice"))
th1.start()th2.start()th1.join()th2.join()
t2 = time.clock()print("耗时:%.2f"%(t2-t1))print("main-----------", money)
6、Lock线程锁(多线程内存错乱问题)
-
概述
Lock锁是线程模块中的一个类,有两个主要方法:acquire()和release() 当调用acquire()方法时,它锁定锁的执行并阻塞锁的执行,直到其他线程调用release()方法将其设置为解锁状态。锁帮助我们有效地访问程序中的共享资源,以防止数据损坏,它遵循互斥,因为一次只能有一个线程访问特定的资源。
-
作用
避免线程冲突
-
锁:确保了这段代码只能由一个线程从头到尾的完整执行阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大的降低了 由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁, 可能造成死锁,导致多个线程只能挂起,只能靠操作系统强行终止
-
注意:
-
当前线程锁定以后 后面的线程会等待(线程等待/线程阻塞)
-
需要release 解锁以后才正常
-
不能重复锁定
-
-
内存错乱实例
import threading import time i = 1 def fun1():global itime.sleep(3)for x in range(1000000):i += xi -= xprint('fun1-----', i) def fun2():global itime.sleep(3)for x in range(1000000):i += xi -= xprint('fun2----', i)
t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print('mian----',i)
问题:两个线程对同一数据同时进行读写,可能造成数据值的不对,我们必须保证一个线程在修改money时其他的线程一定不能修改,线程锁解决数据混乱问题 + 线程锁Lock使用方法 ```pythonimport threading# 创建一个锁lock = threading.Lock()lock.acquire() #进行锁定 锁定成功返回Truelock.release() #进行解锁
-
Lock锁的使用:
import threading#创建一个lock对象 lock = threading.Lock()#初始化共享资源 abce = 0def sumOne():global abce#锁定共享资源lock.acquire()abce = abce + 1#释放共享资源lock.release()def sumTwo():global abce#锁定共享资源lock.acquire()abce = abce + 2#释放共享资源lock.release()#调用函数sumOne() sumTwo() print(abce)
在上面的程序中,lock是一个锁对象,全局变量abce是一个共享资源,sumOne()和sumTwo()函数扮作两个线程,在sumOne()函数中共享资源abce首先被锁定,然后增加了1,然后abce被释放。sumTwo()函数执行类似操作。 两个函数sumOne()和sumTwo()不能同时访问共享资源abce,一次只能一个访问共享资源。
-
解决资源混乱
import threading Lock = threading.Lock() i = 1 def fun1():global iif Lock.acquire(): # 判断是否上锁 锁定成功for x in range(1000000):i += xi -= xLock.release()print('fun1-----', i) def fun2():global iif Lock.acquire(): # 判断是否上锁 锁定成功for x in range(1000000):i += xi -= xLock.release()print('fun2----', i)
t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print('mian----',i)
+ 线程锁的简写(不需要手动解锁)
with lock: 代码段
实例(将上面上锁的代码段更改为) ```pythondef run2():for i in range(1000000):#简写,功能与上面一致with lock:...
结果一样
7、Timer定时执行
-
概述
Timer是Thread的子类,可以指定时间间隔后在执行某个操作
-
使用
import threading def go():print("走我了") # t = threading.Timer(秒数,函数名) t = threading.Timer(3,go) t.start() print('我是主线程的代码')
8、线程池ThreadPoolExecutor
-
模块
concurrent.futures
-
导入 Executor[ɪɡˈzekjətər]
from concurrent.futures import ThreadPoolExecutor
-
方法
-
submit(fun[, args]) 传入放入线程池的函数以及传参
-
map(fun[, iterable_args]) 统一管理
区别:
-
submit与map参数不同 submit每次都需要提交一个目标函数和对应参数 map只需要提交一次目标函数 目标函数的参数 放在一个可迭代对象(列表、字典...)里就可以
-
-
使用
from concurrent.futures import ThreadPoolExecutor import time # import threadpool #线程池 统一管理 线程 def go(str):print("hello",str)time.sleep(2)
name_list = ["lucky","卢yuan凯","姚青","刘佳俊","何必喆"] pool = ThreadPoolExecutor(5) #控制线程的并发数
+ 线程池运行的方式 + 方式一 ```python# 逐一传参扔进线程池for i in name_list:pool.submit(go, i)
简写 ```python all_task = [pool.submit(go, i) for i in name_list] ``` + 方式二 ```python # 统一放入进程池使用 pool.map(go, name_list) # 多个参数 # pool.map(go, name_list1, name_list2...) ``` **map(fn, *iterables, timeout=None)** fn: 第一个参数 fn 是需要线程执行的函数; iterables:第二个参数接受一个可迭代对象; timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。 **注意:**使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。
-
获取返回值
-
方式一
import random from concurrent.futures import ThreadPoolExecutor, as_completed import time # import threadpool #线程池 统一管理 线程 def go(str):print("hello", str)time.sleep(random.randint(1, 4))return str
-
name_list = ["lucky","卢yuan凯","姚青","刘佳俊","何必喆"]
pool = ThreadPoolExecutor(5) #控制线程的并发数
all_task = [pool.submit(go, i) for i in name_list]
# 统一放入进程池使用
for future in as_completed(all_task):print("finish the task")obj_data = future.result()print("obj_data is ", obj_data)
```
**as_completed**
当子线程中的任务执行完后,使用 result() 获取返回结果
该方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。 当有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有任务结束,同时,先完成的任务会先返回给主线程
-
方式二
for result in pool.map(go, name_list):print("task:{}".format(result)) -
wait 等待线程执行完毕 在继续向下执行
from concurrent.futures import ThreadPoolExecutor, wait import time # 参数times用来模拟下载的时间 def down_video(times):time.sleep(times)print("down video {}s finished".format(times))return times
executor = ThreadPoolExecutor(max_workers=2)
通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(down_video, (3)) task2 = executor.submit(down_video, (1))
done方法用于判定某个任务是否完成
print("任务1是否已经完成:", task1.done())
time.sleep(4)
print(wait([task1, task2])) print('wait') print("任务1是否已经完成:", task1.done()) print("任务1是否已经完成:", task2.done())
result方法可以获取task的执行结果
print(task1.result())
+ **线程池与线程对比** 线程池是在程序运行开始,创建好的n个线程,并且这n个线程挂起等待任务的到来。而多线程是在任务到来得时候进行创建,然后执行任务。线程池中的线程执行完之后不会回收线程,会继续将线程放在等待队列中;多线程程序在每次任务完成之后会回收该线程。由于线程池中线程是创建好的,所以在效率上相对于多线程会高很多。线程池也在高并发的情况下有着较好的性能;不容易挂掉。多线程在创建线程数较多的情况下,很容易挂掉。 ### 9、队列模块queue + 导入队列模块 import queue + 概述 queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 + 基本FIFO队列 queue.Queue(maxsize=0) FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,**maxsize是个整数**,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果**maxsize小于或者等于0**,队列大小没有限制。 举个栗子: ```pythonimport queue q = queue.Queue() for i in range(5):q.put(i) while not q.empty():print q.get()
-
一些常用方法
-
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
-
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
-
put(item[, block[, timeout]])
将item放入队列中。
-
如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
-
如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
-
如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
-
其非阻塞版本为
put_nowait等同于put(item, False)
-
-
get([block[, timeout]])
从队列中移除并返回一个数据。block跟timeout参数同
put方法其非阻塞方法为
get_nowait()相当与get(False) -
empty()
如果队列为空,返回True,反之返回False
-
10、案例
中国历年电影票房 | 中国票房 | 中国电影票房排行榜
我们抓取从1994年到2021年的电影票房.
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
def get_page_source(url):resp = requests.get(url)resp.encoding = 'utf-8'return resp.text
def parse_html(html):try:tree = etree.HTML(html)trs = tree.xpath("//table/tbody/tr")[1:]result = []for tr in trs:year = tr.xpath("./td[2]//text()")year = year[0] if year else ""name = tr.xpath("./td[3]//text()")name = name[0] if name else ""money = tr.xpath("./td[4]//text()")money = money[0] if money else ""d = (year, name, money)if any(d):result.append(d)return resultexcept Exception as e:print(e) # 调bug专用
def download_one(url, f):page_source = get_page_source(url)data = parse_html(page_source)for item in data:f.write(",".join(item))f.write("\n")
def main():f = open("movie.csv", mode="w", encoding='utf-8')lst = [str(i) for i in range(1994, 2022)]with ThreadPoolExecutor(10) as t:# 方案一# for year in lst:# url = f"http://www.boxofficecn.com/boxoffice{year}"# # download_one(url, f)# t.submit(download_one, url, f)
# 方案二t.map(download_one, (f"http://www.boxofficecn.com/boxoffice{year}" for year in lst), (f for i in range(len(lst))))
if __name__ == '__main__':main()
二、进程VS线程
-
多任务的实现原理
首先,要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。
如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。
如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。
-
多进程
主进程就是Master,其他进程就是Worker
-
优点
稳定性高:多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。
-
缺点
创建进程的代价大:在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大
操作系统能同时运行的进程数也是有限的:在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题
-
-
多线程
主线程就是Master,其他线程就是Worker
-
优点
多线程模式通常比多进程快一点,但是也快不到哪去
在Windows下,多线程的效率比多进程要高
-
缺点
任何一个线程挂掉都可能直接造成整个进程崩溃:所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程
-
-
计算密集型 vs IO密集型
-
计算密集型(多进程适合计算密集型任务)
要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数
-
IO密集型 (线程适合IO密集型任务)
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用
-
-
GIL
多线程存在GIL锁,同一时刻只能有一条线程执行;在多进程中,每一个进程都有独立的GIL,不会发生GIL冲突;但在这个例子中,爬虫属于IO密集型,多进程适用于CPU计算密集型,所以用时较长,速度慢于多线程并发。
高效编程
一、多任务原理
-
概念
现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统
-
什么叫多任务?
就是操作系统可以同时运行多个任务
-
单核CPU实现多任务原理
操作系统轮流让各个任务交替执行,QQ执行2us(微秒),切换到微信,在执行2us,再切换到陌陌,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就像所有任务都在同时执行一样

-
多核CPU实现多任务原理
真正的秉性执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行

-
并发与并行
-
并发
CPU调度执行速度太快了,看上去一起执行,任务数多于CPU核心数
-
并行
真正一起执行,任务数小于等于CPU核心数
-
并发是逻辑上的同时发生,并行更多是侧重于物理上的同时发生。
-
-
实现多任务的方式
-
多进程模式
启动多个进程,每个进程虽然只有一个线程,但是多个进程可以一起执行多个任务
-
多线程模式
启动一个进程,在一个进程的内部启动多个线程,这样多个线程也可以一起执行多个任务
-
多进程+多线程
启动多个进程,每个进程再启动多个线程
-
协程
-
多进程+协程
-
二、进程
1、概念
-
什么是进程?
是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
-
对于操作系统
一个任务就是一个进程。比方说打开浏览器就是启动一个浏览器的进程,在打开一个记事本就启动一个记事本进程,如果打开两个记事本就启动两个记事本进程
2、使用进程
-
单进程现象
需要等待代码执行完后再执行下一段代码
import time def run1():while 1:print("lucky is a good man")time.sleep(1) def run2():while 1:print("lucky is a nice man")time.sleep(1) if __name__ == "__main__":run1()# 不会执行run2()函数,只有上面的run1()结束才能执行run2()run2() -
启动进程实现多任务
-
multiprocessing模块
跨平台的多进程模块,提供了一个Process类用来示例化一个进程对象
-
Process类
作用:创建进程(子进程)
-
__name__
这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候
__name__不是__main__,就不会递归运行了。参数 说明 target 指定进程执行的任务 args 给进程函数传递的参数,是一个元组 注意:此时进程被创建,但是不会启动进程执行
-
启动进程实现多任务
from multiprocessing import Process
创建子进程
P = Process(target=run,args=("nice",),name='当前进程名称')
-
target指定 子进程运行的函数
-
args 指定传递的参数 , 是元组类型
-
启动进程:Process对象.start()
获取进程信息
-
os.getpid() 获取当前进程id号
-
os.getppid() 获取当前进程的父进程id号
-
multiprocessing.current_process().name 获取当前进程名称
父子进程的先后顺序
-
默认 父进程的结束不能影响子进程 让父进程等待子进程结束再执行父进程
-
p.join() 阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。
-
全局变量在过个进程中不能共享
注意: 在子线程中修改全局变量时对父进程中的全局变量没有影响
-
-
示例代码
import time from multiprocessing import Process def run1(name):while 1:print("%s is a good man"%name)time.sleep(1) def run2():while 1:print("lucky is a nice man")time.sleep(1) if __name__ == "__main__":# 程序启动时的进程称为主进程(父进程)# 创建进程并启动p = Process(target=run1, args=("lucky",))p.start() # 主进程执行run2()函数run2()
-
-
主进程负责调度
主进程主要做的是调度相关的工作,一般不负责具体业务逻辑
import time from multiprocessing import Process def run1():for i in range(7):print("lucky is a good man")time.sleep(1) def run2(name, word):for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.time() # 创建两个进程分别执行run1、run2p1 = Process(target=run1)p2 = Process(target=run2, args=("lucky", "cool")) # 启动两个进程p1.start()p2.start() # 查看耗时t2 = time.time()print("耗时:%.2f"%(t2-t1)) -
父子进程的先后顺序
主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程,等待子进程结束,才能继续运行主进程
p.join() 阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。
import time from multiprocessing import Process def run1():for i in range(7):print("lucky is a good man")time.sleep(1) def run2(name, word):for i in range(5):print("%s is a %s man"%(name, word))time.sleep(1) if __name__ == "__main__":t1 = time.time() p1 = Process(target=run1)p2 = Process(target=run2, args=("lucky", "cool")) p1.start()p2.start() # 主进程的结束不能影响子进程,所以可以等待子进程的结束再结束主进程# 等待子进程结束,才能继续运行主进程p1.join()p2.join() t2 = time.time()print("耗时:%.2f"%(t2-t1))
3、全局变量在多个子进程中不能共享
原因:
在创建子进程时对全局变量做了一个备份,父进程中num变量与子线程中的num不是一个变量
from multiprocessing import Process
#全局变量在进程中 不能共享
num = 10
def run():print("我是子进程的开始")global numnum+=1print(num)print("我是子进程的结束")
if __name__=="__main__":p = Process(target=run)p.start()p.join()
print(num)
尝试列表是否能共享
from multiprocessing import Process
#全局变量在进程中 不能共享
mylist = []
def run():print("我是子进程的开始")global mylistmylist.append(1)mylist.append(2)mylist.append(3)print("我是子进程的结束")
if __name__=="__main__":p = Process(target=run)p.start()p.join()
print(mylist)
4、启动大量子进程
-
获取CPU核心数
print('CPU number:' + str(multiprocessing.cpu_count()))
-
导入
from multiprocesssing import Pool
-
开启并发数
pp = Pool([参数]) #开启并发数 默认是你的核心数
-
创建子进程,并放入进程池管理
apply_async为非阻塞模式(并发执行)
pp.apply_async(run,args=(i,)) #args参数 可以为元组 或者是列表[]
-
关闭进程池
pp.close()关闭进程池
-
join()
在调用join之前必须先调用close,调用close之后就不能再继续添加新的进程了
pp.join()
进程池对象调用join,会等待进程池中所有的子进程结束完毕再去执行父进程
-
实例
# Pool类:进程池类 from multiprocessing import Pool import time import random import multiprocessing def run(index):print('CPU number:' + str(multiprocessing.cpu_count()))print("子进程 %d 启动"%(index))t1 = time.time()time.sleep(random.random()* 5+2)t2 = time.time()print("子进程 %d 结束,耗时:%.2f" % (index, t2-t1)) if __name__ == "__main__":print("启动主进程……") # 创建进程池对象# 由于pool的默认值为CPU的核心数,假设有4核心,至少需要5个子进程才能看到效果# Pool()中的值表示可以同时执行进程的数量pool = Pool(2)for i in range(1, 7):# 创建子进程,并将子进程放到进程池中统一管理pool.apply_async(run, args=(i,)) # 等待子进程结束# 关闭进程池:在关闭后就不能再向进程池中添加进程了# 进程池对象在调用join之前必须先关闭进程池pool.close()#pool对象调用join,主进程会等待进程池中的所有子进程结束才会继续执行主进程pool.join() print("结束主进程……")get方法:获取进程的返回值
from multiprocessing import Lock, Pool import time def function(index):print('Start process: ', index)time.sleep(2)print('End process', index)return index
if name == 'main': pool = Pool(processes=3) for i in range(4): result = pool.apply_async(function, (i,)) print(result.get()) #获取每个 子进程的返回值 print("Started processes") pool.close() pool.join() print("Subprocess done.")
注意:这样来获取每个进程的返回值 那么就会变成单进程 ### 5、map方法 + 概述 如果你现在有一堆数据要处理,每一项都需要经过一个方法来处理,那么map非常适合 比如现在你有一个数组,包含了所有的URL,而现在已经有了一个方法用来抓取每个URL内容并解析,那么可以直接在map的第一个参数传入方法名,第二个参数传入URL数组。 + 概述
```pythonfrom multiprocessing import Poolimport requestsfrom requests.exceptions import ConnectionErrordef scrape(url):try:print(requests.get(url))except ConnectionError:print('Error Occured ', url)finally:print('URL', url, ' Scraped')if __name__ == '__main__':pool = Pool(processes=3)urls = ['https://www.baidu.com','http://www.meituan.com/','http://blog.csdn.net/','http://xxxyxxx.net']pool.map(scrape, urls)
在这里初始化一个Pool,指定进程数为3,如果不指定,那么会自动根据CPU内核来分配进程数。
然后有一个链接列表,map函数可以遍历每个URL,然后对其分别执行scrape方法。
6、单进程与多进程复制文件对比
-
单进程复制文件
import time def copy_file(path, toPath):with open(path, "rb") as fp1:with open(toPath, "wb") as fp2:while 1:info = fp1.read(1024)if not info:breakelse:fp2.write(info)fp2.flush() if __name__ == "__main__":t1 = time.time() for i in range(1, 5):path = r"/Users/lucky/Desktop/file/%d.mp4"%itoPath = r"/Users/lucky/Desktop/file2/%d.mp4"%icopy_file(path, toPath) t2 = time.time()print("单进程耗时:%.2f"%(t2-t1)) -
多进程复制文件
import time from multiprocessing import Pool import os def copy_file(path, toPath):with open(path, "rb") as fp1:with open(toPath, "wb") as fp2:while 1:info = fp1.read(1024)if not info:breakelse:fp2.write(info)fp2.flush() if __name__ == "__main__":t1 = time.time()path = r"/Users/xialigang/Desktop/视频"dstPath = r"/Users/xialigang/Desktop/1视频"fileList = os.listdir(path)pool = Pool() for i in fileList:newPath1 = os.path.join(path, i)newPath2 = os.path.join(dstPath, i)pool.apply_async(copy_file, args=(newPath1, newPath2)) pool.close()pool.join() t2 = time.time()print("耗时:%.2f"%(t2-t1))
7、进程间通信
-
队列共享
-
导入
from multiprocessing import Queue
-
使用
que = Queue() #创建队列
que.put(数据) #压入数据
que.get() #获取数据
-
队列常用函数
Queue.empty() 如果队列为空,返回True, 反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 阻塞式写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
-
特点:先进先出
-
注意:
get方法有两个参数,blocked和timeout,意思为阻塞和超时时间。默认blocked是true,即阻塞式。
当一个队列为空的时候如果再用get取则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用get_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有取到队列元素,那就抛出Queue.Empty异常。
当一个队列为满的时候如果再用put放则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用put_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有放进去元素,那就抛出Queue.Full异常。
另外队列中常用的方法
-
队列的大小
Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。
实例
import multiprocessing queque = multiprocessing.Queue() #创建 队列 #如果在子进程 和主进程 之间 都压入了数据 那么在主进程 和 子进程 获取的就是 对方的数据 def fun(myque):# print(id(myque)) #获取当前的队列的存储地址 依然是拷贝了一份myque.put(['a','b','c']) #在子进程里面压入数据# print("子进程获取",myque.get())#获取队列里面的值 if __name__=='__main__':# print(id(queque))queque.put([1,2,3,4,5]) #将列表压入队列 如果主进程也压入了数据 那么在主进程取的就是在主进程压入的数据 而不是子进程的p = multiprocessing.Process(target=fun,args=(queque,))p.start()p.join()print("主进程获取",queque.get())#在主进程进行获取print("主进程获取",queque.get())#在主进程进行获取# print("主进程获取",queque.get(block=True, timeout=1))#在主进程进行获取 -
-
字典共享
-
导入
import multiprocess
-
概述
Manager是一个进程间高级通信的方法 支持Python的字典和列表的数据类型
-
创建字典
myDict = multiprocess.Manager().dict()
实例
import multiprocessing
-
def fun(mydict): # print(mylist) mydict['x'] = 'x' mydict['y'] = 'y' mydict['z'] = 'z'
if name=='main': # Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。 mydict = multiprocessing.Manager().dict() p = multiprocessing.Process(target=fun,args=(mydict,)) p.start() p.join() print(mydict)
- 列表共享
+ 导入
import multiprocess
+ 创建列表
myDict = multiprocess.Manager().list()
实例(字典与列表共享)
```pythonimport multiprocessing
def fun(List):# print(mylist)List.append('x')List.append('y')List.append('z')
if __name__=='__main__':# Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。List = multiprocessing.Manager().list()p = multiprocessing.Process(target=fun,args=(List,))p.start()p.join()print(List)
-
注意
进程名.terminate() 强行终止子进程
-
deamon
在这里介绍一个属性,叫做deamon。每个进程程都可以单独设置它的属性,如果设置为True,当父进程结束后,子进程会自动被终止。
进程.daemon = True
设置在start()方法之前
import multiprocessing import time def fun():time.sleep(100) if __name__=='__main__':p = multiprocessing.Process(target=fun)p.daemon = Truep.start()print('over') -
进程名.terminate() 强行终止子进程
import multiprocessing import time def fun():time.sleep(100) if __name__=='__main__':p = multiprocessing.Process(target=fun)p.start()p.terminate()p.join()print('over')
8、进程实现生产者消费者
生产者消费者模型描述:
生产者是指生产数据的任务,消费者是指消费数据的任务。
当生产者的生产能力远大于消费者的消费能力,生产者就需要等消费者消费完才能继续生产新的数据,同理,如果消费者的消费能力远大于生产者的生产能力,消费者就需要等生产者生产完数据才能继续消费,这种等待会造成效率的低下,为了解决这种问题就引入了生产者消费者模型。
生产者/消费者问题可以描述为:两个或者更多的进程(线程)共享同一个缓冲区,其中一个或多个进程(线程)作为“生产者”会不断地向缓冲区中添加数据,另一个或者多个进程(线程)作为“消费者”从缓冲区中取走数据。
-
代码
from multiprocessing import Process from multiprocessing import Queue import time def product(q):print("启动生产子进程……")for data in ["good", "nice", "cool", "handsome"]:time.sleep(2)print("生产出:%s"%data)# 将生产的数据写入队列q.put(data)print("结束生产子进程……") def t(q):print("启动消费子进程……")while 1:print("等待生产者生产数据")# 获取生产者生产的数据,如果队列中没有数据会阻塞,等待队列中有数据再获取value = q.get()print("消费者消费了%s数据"%(value))print("结束消费子进程……") if __name__ == "__main__":q = Queue() p1 = Process(target=product, args=(q,))p2 = Process(target=customer, args=(q,)) p1.start()p2.start() p1.join()# p2子进程里面是死循环,无法等待它的结束# p2.join()# 强制结束子进程p2.terminate() print("主进程结束")
9、案例(抓取斗图)
from multiprocessing import Process,Queue
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
import time
import requests
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
}
def get_img_src(url, q):"""进程1: 负责提取页面中所有的img的下载地址将图片的下载地址通过队列. 传输给另一个进程进行下载"""
resp = requests.get(url, headers=headers)tree = etree.HTML(resp.text)srcs = tree.xpath("//li[@class='list-group-item']//img[@referrerpolicy='no-referrer']/@data-original")for src in srcs:q.put(src.strip())resp.close()
def download_img(q):"""进程2: 将图片的下载地址从队列中提取出来. 进行下载."""with ThreadPoolExecutor(20) as t:while 1:try:s = q.get(timeout=20)t.submit(donwload_one, s)except Exception as e:print(e)break
def donwload_one(s):# 单纯的下载功能resp = requests.get(s, headers=headers)file_name = s.split("/")[-1]# 请提前创建好img文件夹with open(f"img/{file_name}", mode="wb") as f:f.write(resp.content)print("一张图片下载完毕", file_name)resp.close()
if __name__ == '__main__':t1 = time.time()q = Queue() # 两个进程必须使用同一个队列. 否则数据传输不了p_list = []for i in range(1, 11):url = f"https://www.pkdoutu.com/photo/list/?page={i}"p = Process(target=get_img_src, args=(url, q))p_list.append(p)for p in p_list:p.start()p2 = Process(target=download_img, args=(q,))p2.start()for p in p_list:p.join()p2.join()print((time.time()-t1)/60)
# 0.49572664896647134
相关文章:
python爬虫之线程与多进程知识点记录
一、线程 1、概念 线程 在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指…...
基于Java (spring-boot)的停车场管理系统
一、项目介绍 基于Java (spring-boot)的停车场管理系统、预订车位系统、停车缴费系统功能: 登录、注册、后台首页、用户信息管理、车辆信息管理、新增车辆、车位费用设置、停泊车辆查询、车辆进出管理、登录日志查询、个人中心、预定停车位、缴费信息。 适用人群&…...
微软Office 2019 批量授权版
软件介绍 微软办公软件套件Microsoft Office 2019 专业增强版2024年1月批量许可版更新推送!Office2019正式版2018年10月份推出,主要为多人跨平台办公与团队协作打造。Office2019整合对过去三年在Office365里所有功能,包括对Word、Excel、Pow…...
ChatGLM2-6B 大语言模型本地搭建
ChatGLM模型介绍: ChatGLM2-6B 是清华 NLP 团队于不久前发布的中英双语对话模型,它具备了强大的问答和对话功能。拥有最大32K上下文,并且在授权后可免费商用! ChatGLM2-6B的6B代表了训练参数量为60亿,同时运用了模型…...
WindowsServer安装mysql最新版
安装 下载相应mysql安装包: MySQL :: Download MySQL Installer 选择不登陆下载 双击运行下载好的mysql-installer-community-*.*.*.msi 进入类型选择页面,本人需要mysql云服务就选择了server only server only(服务器)&#x…...
gin切片表单验证
在Gin中对切片进行表单验证的步骤与对其他类型的字段进行验证类似。以下是一些基本步骤,我们可以根据具体的需求进行调整: 定义结构体: 创建一个结构体,用于存储表单数据。确保结构体中的字段类型与你预期的表单数据类型一致。 使…...
openssl3.2 - 官方demo学习 - certs
文章目录 openssl3.2 - 官方demo学习 - certs概述笔记官方的实验流程mkcerts.sh - 整理ocsprun.sh - 整理ocspquery.sh - 整理从mkcerts.sh整理出来的27个.bata1_create_certificate_directly.cmda2_Intermediate_CA_request_first.cmda3_Sign_request_CA_extensions.cmda4_Ser…...
Datawhale 大模型基础理论 Day1 引言
开源链接如下:https://github.com/datawhalechina/so-large-lm/blob/main/docs/content/ch01.md 语言模型的概念:即能够赋予每个有意义的词(token)以一定的概率的一个函数的集合。 语言模型可以被用来评估输入的质量,…...
HarmonyOS应用开发学习笔记 UIAbility组件与UI的数据同步 EventHub、globalThis
1、 HarmoryOS Ability页面的生命周期 2、 Component自定义组件 3、HarmonyOS 应用开发学习笔记 ets组件生命周期 4、HarmonyOS 应用开发学习笔记 ets组件样式定义 Styles装饰器:定义组件重用样式 Extend装饰器:定义扩展组件样式 5、HarmonyOS 应用开发…...
leetcode每日一题44
130. 被围绕的区域 图论 dfs/bfs dfs代码框架 void dfs(参数) {if (终止条件) {存放结果;return;}for (选择:本节点所连接的其他节点) {处理节点;dfs(图,选择的节点); // 递归回溯,撤销处理结果} }思路:本题要求找到被x围绕的陆…...
idea写sql语句快捷键提醒,mapper注解开发,mybatis
第一步:注入SQL语言 1.显示上下文操作(没有这个选项的话就选中sql然后直接alt回车快捷键)2.注入语言或引用 3.mysql 第二步:配置MySQL数据库连接 1.首先点击侧边的数据库,再点击上面的加号 2.点击数据源ÿ…...
002 Golang-channel-practice
第二题: 创建一个生产器和接收器,再建立一个无缓冲的channel。生产器负责把数据放进管道里,接收器负责把管道里面的数据打印出来。这里我们开5个协程把数据打印出来。 直接上代码! package mainimport ("fmt" )func …...
MFC为对话框资源添加类
VC6新建一个对话框类型的工程; 建立之后资源中默认有2个对话框,一个是主对话框,About这个是默认建立的关于版权信息的; 然后主对话框有对应的.h和.cpp文件;可以在其中进行编程; 默认建立的有一个 关于 对话框; 在资源中新插入一个对话框,IDD_DIALOG1是对话框ID; 新加…...
SpringBoot新手入门完整教程和项目示例
文章目录 SpringBoot新手入门完整教程和项目示例1、SpringBoot简介2、Spring Boot的核心功能?(优点)3、SpringBoot与SpringMVC 的区别?4、构建SpringBoot项目4.1、在官网自动生成下载spring boot项目4.2、手动使用maven创建Spring…...
PHP留言板实现
完整教程PHP留言板 登陆界面 一个初学者的留言板(登录和注册)_php留言板登录注册-CSDN博客 留言板功能介绍 百度网盘 请输入提取码 进入百度网盘后,输入提取码:knxt,即可下载项目素材和游客访问页面的模板文件。 &…...
ssm+vue的物流配送人员车辆调度管理系统的设计与实现(有报告)。Javaee项目,ssm vue前后端分离项项目。
演示视频: ssmvue的物流配送人员车辆调度管理系统的设计与实现(有报告)。Javaee项目,ssm vue前后端分离项目。 项目介绍: 采用M(model)V(view)C(controller&…...
day1·算法-双指针
今天是第一天,GUNDOM带你学算法,跟上我的节奏吗,一起闪击蓝桥杯! 正文展开,今天先上点小菜供大家想用,如有错误或者建议直接放评论区,我会一个一个仔细查看的哦。 双方指针问题一般是在数组中…...
在vue中,切换页面之后如何关闭定时器
在vue中,使用了element-ui的框架,点击左侧切换内部页面。 有些页面使用了定时器,在其换到其他页面的时候,希望能够关闭这些定期请求和复杂操作。 那么,切换页面之后如何关闭定时器?vue的创建流程中没找到能…...
观测云产品更新 | 日志、场景仪表板、监控器等
观测云更新 用户访问监测 (RUM ) 公网 Dataway 支持 ip 转换成地理位置信息。 日志 > 查看器详情页 1、新增 BPF 网络日志采集及日志详情页,支持 Json 格式转化; 2、上述 1 中的日志详情页中新增可读的展示模式,…...
【JupyterLab】在 conda 虚拟环境中 JupyterLab 的安装与使用
【JupyterLab】在 conda 虚拟环境中 JupyterLab 的安装与使用 1 JupyterLab 介绍2 安装2.1 Jupyter Kernel 与 conda 虚拟环境 3 使用3.1 安装中文语言包(Optional)3.2 启动3.3 常用快捷键3.3.1 命令模式下 3.4 远程访问个人计算机3.4.1 局域网下 1 JupyterLab 介绍 官方文档: …...
中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试
作者:Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位:中南大学地球科学与信息物理学院论文标题:BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接:https://arxiv.…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
