当前位置: 首页 > news >正文

爬虫中使用多进程、多线程的混合方式遇到的数据丢失问题

多进程爬虫

项目场景:

网络爬虫项目,主要实现多进程、多线程方式快速缓存网页资源到MongoDB,并解析网页数据,将信息写入到csv文件中。


问题描述

在单独使用多线程的过程中,是没有问题的,比如这个爬虫示例是爬取豆瓣电影排行榜TOP250,解析到csv中数据还是250条,在实现多进程的方式中,主要是通过MongoDB来实现一个队列的效果,多条进程从数据库中取出待解析的链接进行解析,在实现的过程中,发现解析数据是没有问题的,打印到控制台的数据是没有丢失数据的情况,但是在最终写出的csv文件中,数据只有一小部分。
在尝试了国内所有能用的AI之后无果,AI只能对逻辑问题判断,而对一些Runtime问题还是差点意思,好在CSDN有大佬,将问题发布到问答区后,大佬一句话就点醒了我,在此表示感谢。
在这里插入图片描述

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime,timedelta
from multiprocessing.dummy import Pool
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient
from zipfile import ZipFile
from io import StringIO
# 多线程爬虫
# 封装MongoDB缓存类
class MongoCache:def __init__(self,client=None,expires=timedelta(days=30)):if client == None:self.client = MongoClient('localhost',27017)else:self.client = clientself.db = self.client['cache']self.webpage = self.db['webcrawler']self.expires = expiresself.webpage.create_index('timestamp',expireAfterSeconds=expires.total_seconds())def __getitem__(self,url):'''根据url从磁盘提取缓存'''record = self.webpage.find_one({'_id':url})if record:return pickle.loads(zlib.decompress(record['result']))else:raise KeyError(url + "不存在")def __setitem__(self,url,result):'''将数据存入磁盘缓存中'''record = {'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.now()}self.webpage.update_one({'_id':url},{'$set':record},upsert=True)
# 将下载功能封装成一个类
class Downloader:def __init__(self,delay=5,user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0',proxies=None,request_max=3,cache=None):self.throttle = Throttle(delay=delay)self.user_agent = user_agentself.proxies = proxiesself.request_max = request_maxself.cache = cache# 定义连接管理池self.http = urllib3.PoolManager()def __call__(self,url):result = Noneif self.cache:try:result = self.cache[url]except KeyError:passelse:if self.request_max > 0 and 500 <= result['code'] < 600:result = Noneif result is None:self.throttle.wait(url)proxy = random.choice(self.proxies) if self.proxies else Noneheaders = {'User-agent':self.user_agent}result = self.download(url,headers,self.request_max,proxy)if self.cache:self.cache[url] = resultreturn result['html']def download(self,url,headers,request_max,proxy=None):print("正在下载, URL==>{}".format(url))# 发起GET请求request = urllib.request.Request(url,headers=headers)response = self.http.request('GET', url,headers=headers)# 如果使用代理的话if proxy:opener = urllib.request.build_opener()proxy_params = {urlparse.urlparse(url).scheme:proxy}opener.add_handler(urllib.request.ProxyHandler(proxy_params))try:response = opener.open(request)if response.status == 200:html = response.dataelse:print("遇到了错误,状态码是:{}".format(response.status))if request_max >= 0:self.download(url,headers,request_max-1,proxy)except Exception as e:print("下载遇到了错误,错误代码是==>{}".format(e))html = Noneif request_max >=3:html = self.download(url,headers,request_max-1,proxy)finally:response.release_conn()return {'html':html,'code':response.status}else:# 如果没有选择代理,那就正常请求try:if response.status == 200:html_file = response.data  # 或者 response.data.decode('utf-8') 如果需要字符串# 在这里处理 htmlfile,比如保存到文件或进行解析等return {'html':html_file,'code':response.status}else:print("遇到了错误,状态码是:{}".format(response.status))if request_max >= 0:self.download(url,headers,request_max-1)except urllib3.exceptions.HTTPError as e:print("遇到了错误,错误代码是==>{}".format(e))except Exception as ex:print("遇到了错误,错误代码是==>{}".format(ex))finally:response.release_conn()
# 定义一个scrape_callback类,用于存储解析到的数据
class Scrape_callback:def __init__(self):self.writer = csv.writer(open('D:/Crawl_Results/downloaded_data.csv','w', encoding='utf-8',newline='',errors='replace'))self.fields = ('中文名','外文名','评分','上映时间','国家','导演','时长','类型')self.writer.writerow(self.fields)def __call__(self,html):if not self.writer:raise RuntimeError("CSV writer is not initialized. Call open_writer() first.")html_string = html.decode("utf-8")root = lhtml.fromstring(html_string)result_list = []try:# 解析电影标题title_content = root.cssselect("div#content")[0]span_title = title_content.cssselect('span[property="v:itemreviewed"]')[0]title_text = span_title.text_content().split(" ",1)for name in title_text:result_list.append(name)if len(title_text) == 1:result_list.append('--')# 解析电影评分rate_span = root.cssselect('strong[property="v:average"]')[0]rate_text = rate_span.text_content()result_list.append(rate_text)# 解析上映国家及日期date_span = root.cssselect('span[property="v:initialReleaseDate"]')[0]date_text = date_span.text_content()parenthesis_index = date_text.find('(')if parenthesis_index != -1:# 提取日期部分(括号前的所有字符)date = date_text[:parenthesis_index]# 提取国家部分(括号内及之后的字符,再去除括号)country = date_text[parenthesis_index + 1:-1]else:# 如果没有找到括号,则只有日期部分date = date_textcountry = "--"result_list.append(date)result_list.append(country)# 解析导演direct_by_a = root.cssselect('a[rel="v:directedBy"]')[0]direct_by_text = direct_by_a.text_content()result_list.append(direct_by_text)# 解析片长runtime_span = root.cssselect('span[property="v:runtime"]')[0]runtime_text = runtime_span.text_content()result_list.append(runtime_text)gener_text=''# 解析类型gener_spans = root.cssselect('span[property="v:genre"]')for gener_span in gener_spans:gener_text += gener_span.text_content() + '|'gener_text = gener_text.rstrip('|')result_list.append(gener_text)print("{}|{}|{}|{}|{}".format(title_text,rate_text,date,country,direct_by_text,gener_text))self.writer.writerow(result_list)result_list.clear()  # 清空列表以备下次使用,而不是重新创建except IndexError:print("未找到指定的元素")except Exception as e:print(f"处理过程中发生错误: {e}")def do_write(self,result_list):if not self.writer == None:self.writer.writerow(result_list)else:print("打开文件失败")def close_writer(self):# 如果writer是外部创建的,则不应在此关闭文件# 但在当前上下文中,文件是在这个类中打开的,所以应该在这里关闭if self.writer:#self.writer.writerow([])  # 写入空行作为结束标记(可选)# 注意:在with块外不需要手动关闭文件,它会自动处理self.writer = None  # 清除writer引用,帮助垃圾回收
# 定义一个类,用于控制延时
class Throttle:'''用于控制爬虫访问统一域名资源时的延时'''# 初始化函数def __init__(self,delay):self.delay = delayself.domains = {}# 控制延时def wait(self,url):# 解析url,获取域名domain = urlparse(url).netloc# 获取上一次访问的时间last_accessed = self.domains.get(domain)# 如果设置到延时并且已经访问过了if self.delay > 0 and last_accessed is not None:# 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值sleep_secs = self.delay - (datetime.now() - last_accessed).seconds# 判断距离上次访问的时间间隔是否达到了延迟要求if sleep_secs > 0:print("正在休眠,将等待{}秒后再次连接".format(sleep_secs))# 如果时间还没有达到,就调用time.sleep,进行休眠time.sleep(sleep_secs)# 更新本次访问的时间self.domains[domain] = datetime.now()
# 爬取网页的函数
def threaded_crawler(delay,request_max,seed_url,link_regex,max_deepth=5,max_threads=6,scrape_callback=Scrape_callback(),cache=MongoCache(),proxies=None):# 定义一个User_agent列表user_agent_list = ['BadCrawler','GoodCrawler']# 解析网站的robots.txtrp = urllib.robotparser.RobotFileParser()rp.set_url(f"{seed_url}/robots.txt")rp.read()# 定义一个用户当前设置的user_agentcurrent_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0'# 只有当默认的火狐这个User-agent被禁,再从user_agent_list中找看还有合适的没if not rp.can_fetch(current_user_agent,seed_url):# 从列表中找一个网站允许的user_agentfor user_agent in user_agent_list:if rp.can_fetch(user_agent,seed_url):current_user_agent = user_agentbreakelse:print("该网站的robots.txt禁止我们访问")# 从提供的种子url生成一个待解析的url列表crawl_url_queue = [seed_url]# 定义一个字典,记录链接和深度,用于判断链接是否已经下载,避免在不同页面中反复横跳have_crawl_url_queue = {seed_url:0}downloader = Downloader(delay=delay,user_agent=current_user_agent,cache=cache,request_max=request_max,proxies=proxies)for item_count in range(1,10):current_url = "{}?start={}".format(seed_url,item_count*25)crawl_url_queue.append(current_url)have_crawl_url_queue[current_url] = 0def process_queue():current_thread = threading.current_thread()thread_name = current_thread.namewhile crawl_url_queue:try:# 只要列表中有值,则弹出一个url用于解析url = crawl_url_queue.pop()print("线程{}==>正在处理:{}".format(thread_name,url))except IndexError as index_error:breakelse:# 读取当前要解析url的深度,如果深度超过最大值,则停止deepth = have_crawl_url_queue[url]if deepth <= max_deepth:# 执行下载html = downloader(url)if not html == None:# 如果有传入提取数据的回调函数,则调用它if scrape_callback:scrape_callback(html)# 从下载到的html网页中递归的获取链接links_from_html = get_links(html)if not links_from_html == None:for link in links_from_html:link = urljoin(seed_url,link)# 判断找到的链接是否符合我们想要的正则表达式if re.match(link_regex,link):# 如果符合,再判断是否已经下载过了,如果没有下载过,就把它加到待解析的url列表和已下载集合中if link not in have_crawl_url_queue:have_crawl_url_queue[link] = deepth + 1crawl_url_queue.append(link)threads = []while threads or crawl_url_queue:for thread in threads:if not thread.is_alive():threads.remove(thread)while len(threads) < max_threads and crawl_url_queue:thread = threading.Thread(target=process_queue)thread.setDaemon(True)thread.start()threads.append(thread)# 从下载到的html中继续解析连接
def get_links(html):webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']',re.IGNORECASE)if not html == None:html_string = html.decode("utf-8")return webpage_regex.findall(html_string)else:return None# 测试
seed_url="https://movie.douban.com/top250"
link_regex="^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d+)/$"
threaded_crawler(5,5,seed_url,link_regex,5)

原因分析:

当多个进程或线程试图同时写入同一个CSV文件时,因为文件I/O操作不是线程安全的,特别是在没有适当锁定机制的情况下,在这个脚本中,虽然使用了锁,但是锁只是锁定了线程间的竞争,多个进程在写入的时候,实际上是存在文件覆盖的情况的;为了解决这个问题,我们可以采用“分而治之”的策略:让每个进程将其结果写入一个独立的CSV文件,然后再合并这些文件。


解决方案:

在调用Scrape_callback()类时,为其传入进程的ID,让每一条进程单独处理一个csv文件,这样就不存在文件覆盖的问题,在解析完所有的文件后,再将这些csv文件合并为一个文件输出。

from datetime import datetime,timedelta
import multiprocessing
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient,errors
from zipfile import ZipFile
from io import StringIO
# 多进程
# 封装MongoDB进程队列
class MongoQueue:OUTSTANDING,PROCESSING,COMPLETE = range(3)def __init__(self,client=None,timeout=300):if client == None:self.client = MongoClient('localhost',27017)else:self.client = clientself.db = self.client['cache']self.webpage = self.db['crawler_queue']self.timeout = timeoutself.lock = threading.Lock()def __bool__(self):record = self.webpage.find_one({'status':{'$ne':self.COMPLETE}})if record:return Trueelse:return Falsedef push(self,url):with self.lock:try:self.webpage.insert_one({'_id':url,'status':self.OUTSTANDING,'timestamp':datetime.now()})except errors.DuplicateKeyError as e:self.repair()passdef pop(self):with self.lock:record = self.webpage.find_one_and_update(filter = {'status':self.OUTSTANDING},update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}})if record:return record['_id']else:self.repair()raise KeyError()def complete(self,url):#self.webpage.update_one({'_id':url},{'$set':{'status':self.COMPLETE}})self.webpage.delete_one({'_id':url})def repair(self):record = self.webpage.find_one_and_update(filter={'timestamp':{'$lt':datetime.now() - timedelta(seconds=self.timeout)},'status':{'$ne':self.OUTSTANDING}},update={'$set':{'status':self.OUTSTANDING}})if record:print("Released:{}".format(record['_id']))def clear(self):self.webpage.delete_many({'status':{'$ne':self.OUTSTANDING}})# 封装磁盘缓存类
class DiskCache:def __init__(self,max_length,cache_dir='D:\\Crawl_Results\\cache',expires=timedelta(days=30)):self.cache_dir = cache_dirself.max_length = max_lengthself.expires = expiresdef url_to_path(self,url):'''从传入的url中创建文件路径'''components = urlsplit(url)path = components.pathif not path:path = '/index.html'elif path.endswith('/'):path += 'index.html'filename = components.netloc + path + components.queryfilename = re.sub('[^/0-9a-zA-Z\\-.,;]','_',filename)filename =  '/'.join(segment[:255] for segment in filename.split('/'))return os.path.join(self.cache_dir,filename)def __getitem__(self,url):'''根据url从磁盘提取缓存'''path = self.url_to_path(url)if os.path.exists(path):with open(path,'rb') as fp:#result,timestamp = pickle.loads(zlib.decompress(fp.read()))result = fp.read()# if self.has_expired(timestamp):#     raise KeyError(url + '缓存资源已过期')# return resultelse:raise KeyError(url + "不存在")def __setitem__(self,url,result):'''将数据存入磁盘缓存中'''path = self.url_to_path(url)folder = os.path.dirname(path)# 时间戳# timestamp = datetime.now()# data = pickle.dumps((result,timestamp))if not os.path.exists(folder):os.makedirs(folder)print("保存到了{}".format(folder))with open(path,'wb') as fp:#fp.write(zlib.compress(data))fp.write(result)def has_expired(self, timestamp):'''判断缓存是否过期'''return datetime.now() > timestamp + self.expires# 封装MongoDB缓存类
class MongoCache:def __init__(self,client=None,expires=timedelta(days=30)):if client == None:self.client = MongoClient('localhost',27017)else:self.client = clientself.db = self.client['cache']self.webpage = self.db['webcrawler']self.expires = expiresself.webpage.create_index('timestamp',expireAfterSeconds=expires.total_seconds())def __getitem__(self,url):'''根据url从磁盘提取缓存'''record = self.webpage.find_one({'_id':url})if record:return pickle.loads(zlib.decompress(record['result']))else:raise KeyError(url + "不存在")def __setitem__(self,url,result):'''将数据存入磁盘缓存中'''record = {'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.now()}self.webpage.update_one({'_id':url},{'$set':record},upsert=True)# 将下载功能封装成一个类
class Downloader:def __init__(self,delay=5,user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0',proxies=None,request_max=3,cache=None):self.throttle = Throttle(delay=delay)self.user_agent = user_agentself.proxies = proxiesself.request_max = request_maxself.cache = cache# 定义连接管理池self.http = urllib3.PoolManager()def __call__(self,url):result = Noneif self.cache:try:result = self.cache[url]except KeyError:passelse:if self.request_max > 0 and 500 <= result['code'] < 600:result = Noneif result is None:self.throttle.wait(url)proxy = random.choice(self.proxies) if self.proxies else Noneheaders = {'User-agent':self.user_agent}result = self.download(url,headers,self.request_max,proxy)if self.cache:self.cache[url] = resultreturn result['html']def download(self,url,headers,request_max,proxy=None):print("正在下载, URL==>{}".format(url))# 发起GET请求request = urllib.request.Request(url,headers=headers)response = self.http.request('GET', url,headers=headers)# 如果使用代理的话if proxy:opener = urllib.request.build_opener()proxy_params = {urlparse.urlparse(url).scheme:proxy}opener.add_handler(urllib.request.ProxyHandler(proxy_params))try:response = opener.open(request)if response.status == 200:html = response.dataelse:print("遇到了错误,状态码是:{}".format(response.status))if request_max >= 0:self.download(url,headers,request_max-1,proxy)except Exception as e:print("下载遇到了错误,错误代码是==>{}".format(e))html = Noneif request_max >=3:html = self.download(url,headers,request_max-1,proxy)finally:response.release_conn()return {'html':html,'code':response.status}else:# 如果没有选择代理,那就正常请求try:if response.status == 200:html_file = response.data  # 或者 response.data.decode('utf-8') 如果需要字符串# 在这里处理 htmlfile,比如保存到文件或进行解析等return {'html':html_file,'code':response.status}else:print("遇到了错误,状态码是:{}".format(response.status))if request_max >= 0:self.download(url,headers,request_max-1)except urllib3.exceptions.HTTPError as e:print("遇到了错误,错误代码是==>{}".format(e))except Exception as ex:print("遇到了错误,错误代码是==>{}".format(ex))finally:response.release_conn()# 定义一个scrape_callback类,用于存储解析到的数据
class Scrape_callback:def __init__(self,process_id):self.writer = csv.writer(open(f'D:/Crawl_Results/downloaded_data_{process_id}.csv','w', encoding='utf-8',newline='',errors='replace'))self.fields = ('中文名','外文名','评分','上映时间','国家','导演','时长','类型')self.writer.writerow(self.fields)self.process_id = process_idself.lock = threading.Lock()def __call__(self,html):with self.lock:if not self.writer:raise RuntimeError("CSV writer is not initialized. Call open_writer() first.")html_string = html.decode("utf-8")root = lhtml.fromstring(html_string)result_list = []try:# 解析电影标题title_content = root.cssselect("div#content")[0]span_title = title_content.cssselect('span[property="v:itemreviewed"]')[0]title_text = span_title.text_content().split(" ",1)for name in title_text:result_list.append(name)if len(title_text) == 1:result_list.append('--')# 解析电影评分rate_span = root.cssselect('strong[property="v:average"]')[0]rate_text = rate_span.text_content()result_list.append(rate_text)# 解析上映国家及日期date_span = root.cssselect('span[property="v:initialReleaseDate"]')[0]date_text = date_span.text_content()parenthesis_index = date_text.find('(')if parenthesis_index != -1:# 提取日期部分(括号前的所有字符)date = date_text[:parenthesis_index]# 提取国家部分(括号内及之后的字符,再去除括号)country = date_text[parenthesis_index + 1:-1]else:# 如果没有找到括号,则只有日期部分date = date_textcountry = "--"result_list.append(date)result_list.append(country)# 解析导演direct_by_a = root.cssselect('a[rel="v:directedBy"]')[0]direct_by_text = direct_by_a.text_content()result_list.append(direct_by_text)# 解析片长runtime_span = root.cssselect('span[property="v:runtime"]')[0]runtime_text = runtime_span.text_content()result_list.append(runtime_text)gener_text=''# 解析类型gener_spans = root.cssselect('span[property="v:genre"]')for gener_span in gener_spans:gener_text += gener_span.text_content() + '|'gener_text = gener_text.rstrip('|')result_list.append(gener_text)print("{}|{}|{}|{}|{}".format(title_text,rate_text,date,country,direct_by_text,gener_text))self.writer.writerow(result_list)result_list.clear()  # 清空列表以备下次使用,而不是重新创建except IndexError:print("未找到指定的元素")except Exception as e:print(f"处理过程中发生错误: {e}")def do_write(self,result_list):if not self.writer == None:self.writer.writerow(result_list)else:print("打开文件失败")def close_writer(self):# 如果writer是外部创建的,则不应在此关闭文件# 但在当前上下文中,文件是在这个类中打开的,所以应该在这里关闭if self.writer:#self.writer.writerow([])  # 写入空行作为结束标记(可选)# 注意:在with块外不需要手动关闭文件,它会自动处理self.writer = None  # 清除writer引用,帮助垃圾回收# 定义一个类,用于控制延时
class Throttle:'''用于控制爬虫访问统一域名资源时的延时'''# 初始化函数def __init__(self,delay):self.delay = delayself.domains = {}# 控制延时def wait(self,url):# 解析url,获取域名domain = urlparse(url).netloc# 获取上一次访问的时间last_accessed = self.domains.get(domain)# 如果设置到延时并且已经访问过了if self.delay > 0 and last_accessed is not None:# 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值sleep_secs = self.delay - (datetime.now() - last_accessed).seconds# 判断距离上次访问的时间间隔是否达到了延迟要求if sleep_secs > 0:print("正在休眠,将等待{}秒后再次连接".format(sleep_secs))# 如果时间还没有达到,就调用time.sleep,进行休眠time.sleep(sleep_secs)# 更新本次访问的时间self.domains[domain] = datetime.now()# 爬取网页的函数
def threaded_crawler(seed_url,link_regex,process_id,max_threads=3,crawl_queue = MongoQueue()):# 创建用于文件解析的类scrape_callback = Scrape_callback(process_id)# 定义一个User_agent列表user_agent_list = ['BadCrawler','GoodCrawler']# 解析网站的robots.txtrp = urllib.robotparser.RobotFileParser()rp.set_url(f"{seed_url}/robots.txt")rp.read()# 定义一个用户当前设置的user_agentcurrent_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0'# 只有当默认的火狐这个User-agent被禁,再从user_agent_list中找看还有合适的没if not rp.can_fetch(current_user_agent,seed_url):# 从列表中找一个网站允许的user_agentfor user_agent in user_agent_list:if rp.can_fetch(user_agent,seed_url):current_user_agent = user_agentbreakelse:print("该网站的robots.txt禁止我们访问")# 创建队列并把种子url添加进去crawl_queue.push(seed_url)downloader = Downloader(delay=5,user_agent=current_user_agent,cache=MongoCache(),request_max=5,proxies=None)for item_count in range(1,10):current_url = "{}?start={}".format(seed_url,item_count*25)crawl_queue.push(current_url)def process_queue():current_thread = threading.current_thread()thread_name = current_thread.namewhile crawl_queue:try:#print("当前有带解析的链接共{}条".format(len(crawl_queue)))# 只要列表中有值,则弹出一个url用于解析url = crawl_queue.pop()except IndexError as index_error:print("出错了")breakexcept KeyError as keyerror:passelse:# 执行下载html = downloader(url)if not html == None:# 如果有传入提取数据的回调函数,则调用它scrape_callback(html)# 从下载到的html网页中递归的获取链接links_from_html = get_links(html)if not links_from_html == None:for link in links_from_html:link = urljoin(seed_url,link)# 判断找到的链接是否符合我们想要的正则表达式if re.match(link_regex,link):crawl_queue.push(link)# 修改url的状态crawl_queue.complete(url)threads = []while threads or crawl_queue:for thread in threads:if not thread.is_alive():threads.remove(thread)while len(threads) < max_threads and crawl_queue:thread = threading.Thread(target=process_queue)thread.setDaemon(True)thread.start()threads.append(thread)scrape_callback.close_writer()crawl_queue.clear()# 多进程函数
def process_link_crawler(args,**kwargs):# 解包参数以获取seed_url和link_regexseed_url, link_regex = argsnum_cpus = multiprocessing.cpu_count()processes = []csv_files = []use_cpu = num_cpus//4for i in range(use_cpu):process_id = f"pid_{os.getpid()}_{i}"  # 生成唯一的进程ID标识符csv_files.append(f'D:/Crawl_Results/downloaded_data_{process_id}.csv')p = multiprocessing.Process(target=threaded_crawler, args=(seed_url, link_regex, process_id))p.start()processes.append(p)for p in processes:p.join()# 解析完毕,开始合并文件# 合并CSV文件merge_csv_files(csv_files, 'D:/Crawl_Results/merged_data.csv')# 用于合并csv的
def merge_csv_files(csv_files, output_file):with open(output_file, 'w', encoding='utf-8', newline='') as outfile:writer = csv.writer(outfile)for csv_file in csv_files:with open(csv_file, 'r', encoding='utf-8', errors='replace') as infile:reader = csv.reader(infile)next(reader)  # 跳过标题行,因为它已经在第一个文件中写入了for row in reader:writer.writerow(row)print("合并成功!")# 清理单独的CSV文件(可选)for csv_file in csv_files:os.remove(csv_file)# 从下载到的html中继续解析连接
def get_links(html):webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']',re.IGNORECASE)if not html == None:html_string = html.decode("utf-8")return webpage_regex.findall(html_string)else:return Nonedef main():# 测试seed_url2="https://movie.douban.com/annual/2023/"seed_url="https://movie.douban.com/top250"link_regex="^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d+)/$"process_link_crawler((seed_url,link_regex))if __name__ == '__main__':main()

相关文章:

爬虫中使用多进程、多线程的混合方式遇到的数据丢失问题

项目场景&#xff1a; 网络爬虫项目&#xff0c;主要实现多进程、多线程方式快速缓存网页资源到MongoDB&#xff0c;并解析网页数据&#xff0c;将信息写入到csv文件中。 问题描述 在单独使用多线程的过程中&#xff0c;是没有问题的&#xff0c;比如这个爬虫示例是爬取豆瓣电…...

多云应用安全平台RegData利用MongoDB简化数据控制和合规流程

在高度规范化市场中&#xff0c;为了保障数据安全&#xff0c;企业可能需要部署一系列繁琐且成本高昂的IT基础设施系统。随着各项数据安全保护措施的出台&#xff0c;企业需要遵守的法规数量越多&#xff0c;尤其是跨越多个地域的企业&#xff0c;其IT基础设施就会越复杂。如今…...

VUE实现TAB切换不同页面

VUE实现TAB切换不同页面 实现效果 资源准备 ReceiveOrderList, TodoListMulti, SignList 这三个页面就是需要切换的页面 首页代码 <template><div><el-tabs v-model"activeTab" type"card" tab-click"handleTabClick"><…...

C++ 80行 极简扫雷

一共5346个字符&#xff0c;MinGW编译通过&#xff08;强烈不建议写这种代码&#xff01;&#xff01;&#xff01;&#xff09; 压行规则&#xff1a;一行不超过80个字符 代码&#xff1a; #include<windows.h> #include<stdio.h> #include<time.h> #def…...

常见VPS服务器附加组件一览

网络主机行业竞争非常激烈&#xff0c;因此主机服务提供商竭尽全力为客户提供完整的解决方案&#xff0c;其中包含构建和管理在线项目所需的一切。但客户通常有特定需求&#xff0c;因此需要不同的附加组件。在管理自己的网络服务器时尤其如此。 今天&#xff0c;我们将介绍您…...

Electron 使用Electron-build 进行打包

看完下面两篇就可以完成&#xff01; 基于vue3vite的web项目改为Electron桌面应用&#xff08;一&#xff09;_vue3转electron-CSDN博客 将web项目打包成electron桌面端教程&#xff08;二&#xff09;vue3vitets_vue3 打包桌面端-CSDN博客 打包报错 1. 首先确定依赖包 npm …...

Springboot+Websocket+Security+Vue 实现弹幕推送功能

后端部分 (Spring Boot) 1. 创建一个 Spring Boot 项目 创建一个新的 Spring Boot 项目并添加以下依赖&#xff1a; <dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId…...

LangChain之网络爬虫

网络爬虫 概述 网络爬虫是LangChain中的一项关键功能&#xff0c;允许用户自动从互联网上收集信息。这项功能对于研究和数据收集尤其有价值&#xff0c;因为它可以大幅减少手动搜索和信息整理的工作量。 从网络收集内容有几个主要组件&#xff1a; Search搜索&#xff1a;使用…...

VueRouter 相关信息

VueRouter 是Vue.js官方路由插件&#xff0c;与Vue.js深度集成&#xff0c;用于构建单页面应用。构建的单页面是基于路由和组件&#xff0c;路由设定访问路径&#xff0c;将路径与组件进行映射。VueRouter有两中模式 &#xff1a;hash 和 history &#xff0c;默认是hash模式。…...

[环境配置]Pycharm:Failed to start [PowerShell.exe]

解决方法&#xff0c;点Local旁边的 号&#xff0c;点击Command Prompt&#xff0c;即可在Pycharm中呼出控制台。 如果要修改Command Prompt的启动时访问的cmd.exe的路径&#xff0c;可以去Settings→Tools→Terminal中&#xff0c;修改Shell Path实现&#xff0c;改为cmd.exe…...

搜狗爬虫(www.sogou.com)IP及UA,真实采集数据

一、数据来源&#xff1a; 1、这批搜狗爬虫&#xff08;www.sogou.com&#xff09;IP来源于尚贤达猎头网站采集数据&#xff1b; ​ 2、数据采集时间段&#xff1a;2023年10月-2024年7月&#xff1b; 3、判断标准&#xff1a;主要根据用户代理是否包含“www.sogou.com”和IP核实…...

北京青蓝智慧科技ITSS服务经理:长安链ChainBridge“链桥”问世 加速国家级区块链网络互联互通

8月5日&#xff0c;据国家区块链技术创新中心消息&#xff0c;我国首个完全自主控制的区块链软硬件技术系统——长安链&#xff0c;正式推出了全场景技术平台ChainBridge“链桥”。 此平台能够支持所有异构和同构的区块链进行协作&#xff0c;满足跨领域、跨地域、跨行业及跨层…...

音视频入门基础:WAV专题(5)——FFmpeg源码中解码WAV Header的实现

音视频入门基础&#xff1a;WAV专题系列文章&#xff1a; 音视频入门基础&#xff1a;WAV专题&#xff08;1&#xff09;——使用FFmpeg命令生成WAV音频文件 音视频入门基础&#xff1a;WAV专题&#xff08;2&#xff09;——WAV格式简介 音视频入门基础&#xff1a;WAV专题…...

爬虫:csv存储:写入和读取

目录 csv写入 csv读取 csv写入 import csv# data [ # (tf, 20, 180), # (dl, 20, 170), # (hc, 18, 190) # ] # header (姓名,年龄,身高) # # # csv写入数据会默认写一行隔一行 newline就是让它不要有空行 # with open(text.csv,w,encodingutf8,newline) as f:…...

Opencv-绘制几何图形

1. 绘制圆形 1.1 circle()函数原型 void cv::circle(InputOutputArray img, Point center, int radius, const Scalar & color, int thickness 1, int lineType LINE_8, int shift 0 ) img&#xff1a;需要绘制圆形的图像。 center&#xff1a;圆形的圆心位置坐标。 …...

ElasticSearch安装与集群部署

ElasticSearch安装与集群部署 很多小伙伴第一次接触ElasticSearch的时候是一脸愁容,这个东西他怎么用啊,不知道从哪里安装,那我们今天就着重从哪里下载?怎么下载?怎么安装?来研究一下吧! windows下载安装ElasticSearch 下载地址&#xff1a;https://www.elastic.co/cn/do…...

盘点12款企业常用源代码加密软件,源代码防泄密很重要!

在当今的商业环境中&#xff0c;源代码作为企业的核心资产之一&#xff0c;其安全性不容忽视。源代码的泄露可能导致企业丧失竞争优势、面临法律诉讼甚至经济损失。因此&#xff0c;选择合适的源代码加密软件成为企业保护知识产权和核心技术的关键步骤。 1. 安秉源代码加密软件…...

文件上传和下载

要想实现文件上传和下载&#xff0c;其实只需要下述代码即可&#xff1a; 文件上传和下载 import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.StrUtil; import com.example.common.Result; import org.springframework.web.bind.annotation.*; import org.sprin…...

机械学习—零基础学习日志(高数22——泰勒公式理解深化)

核心思想&#xff1a;函数逼近 在泰勒的年代&#xff0c;如果想算出e的0.001次方&#xff0c;这是很难计算的。那为了能计算这样的数字&#xff0c;可以尝试逼近的思想。 但是函数又不能所有地方都相等&#xff0c;那退而求其次&#xff0c;只要在一个极小的范围&#xff0c;…...

Java | Leetcode Java题解之第318题最大单词长度乘积

题目&#xff1a; 题解&#xff1a; class Solution {public int maxProduct(String[] words) {Map<Integer, Integer> map new HashMap<Integer, Integer>();int length words.length;for (int i 0; i < length; i) {int mask 0;String word words[i];in…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

Java多线程实现之Thread类深度解析

Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...