Python - 爬虫;Scrapy框架之插件Extensions(四)
阅读本文前先参考
https://blog.csdn.net/MinggeQingchun/article/details/145904572
在 Scrapy 中,扩展(Extensions)是一种插件,允许你添加额外的功能到你的爬虫项目中。这些扩展可以在项目的不同阶段执行,比如启动、关闭、处理请求、处理响应等。
Extensions官网文档:Extensions — Scrapy 2.12.0 documentation
Signals官网文档:
在 Scrapy 中,扩展是通过实现 scrapy.interfaces.ISpiderLoader
、scrapy.interfaces.IDownloaderMiddleware
、scrapy.interfaces.IExtension
等接口来定义的。最常用的扩展接口是 IExtension
。
一、创建和使用扩展
1、定义扩展
首先,定义一个扩展类,该类需要实现 scrapy.extensions.Extension
类。例如,创建一个简单的扩展来记录每个请求的 URL:
from scrapy import signalsclass UrlLogExtension:def __init__(self, stats):self.stats = stats@classmethoddef from_crawler(cls, crawler):# 从爬虫设置中获取统计对象stats = crawler.statsext = cls(stats)crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)return extdef spider_opened(self, spider):self.stats.set_value('url_count', 0)def spider_closed(self, spider):url_count = self.stats.get_value('url_count')print(f"Total URLs processed: {url_count}")
2、在 settings.py 中启用扩展
在Scrapy 项目的 settings.py
文件中,添加你的扩展到 EXTENSIONS
设置中:
EXTENSIONS = {'path.to.your.extension.UrlLogExtension': 500, # 数字表示优先级,数字越小优先级越高
}
3、编写中间件或信号处理逻辑(如果需要)
如果你的扩展需要处理特定的信号(如请求、响应等),你可以在扩展的类中定义相应的方法,并通过 crawler.signals.connect
方法连接到这些信号。例如,在上面的 UrlLogExtension
中,我们连接了 spider_opened
和 spider_closed
信号。
内置扩展
Scrapy 提供了9个内置的扩展:
-
scrapy.extensions.corestats.CoreStats:scrapy核心数据统计
-
scrapy.extensions.telnet.TelnetConsole:scrapy运行时开启tcp服务,利用telnet进行连接查询scrapy的实时状态
-
scrapy.extensions.memusage.MemoryUsage:内存使用预警功能,不能在window上面使用
-
scrapy.extensions.memdebug.MemoryDebugger:开启gc,垃圾回收,然后统计对应的信息
-
scrapy.extensions.closespider.CloseSpider:主要功能是控制超时个数、page个数、item个数、错误次数
-
scrapy.extensions.feedexport.FeedExporter:将抓取的数据导出到文件。支持多种序列化格式(如JSON、CSV、XML等)和存储后端(如本地文件系统、FTP、S3等),使得用户可以根据需求将数据导出为所需的格式并保存到适当的存储介质中
-
scrapy.extensions.logstats.LogStats:主要统计page、item的个数等信息,从而计算频率。
-
scrapy.extensions.spiderstate.SpiderState:保存SpiderState信息
-
scrapy.extensions.throttle.AutoThrottle:自适应调整延迟下载时间
在Scrapy下的default_settings.py文件中
D:\xx\项目\env\Lib\site-packages\scrapy\settings\default_settings.py
EXTENSIONS = {}EXTENSIONS_BASE = {"scrapy.extensions.corestats.CoreStats": 0,"scrapy.extensions.telnet.TelnetConsole": 0,"scrapy.extensions.memusage.MemoryUsage": 0,"scrapy.extensions.memdebug.MemoryDebugger": 0,"scrapy.extensions.closespider.CloseSpider": 0,"scrapy.extensions.feedexport.FeedExporter": 0,"scrapy.extensions.logstats.LogStats": 0,"scrapy.extensions.spiderstate.SpiderState": 0,"scrapy.extensions.throttle.AutoThrottle": 0,
}
可以在
settings.py
中启用这些扩展,如:
EXTENSIONS = {'scrapy.extensions.logstats.LogStats': 500, # 日志统计信息'scrapy.extensions.telnet.TelnetConsole': 500, # Telnet 控制台
}
二、创建和使用扩展
1、scrapy.extensions.corestats.CoreStats
"""
Extension for collecting core stats like items scraped and start/finish times
"""from __future__ import annotationsfrom datetime import datetime, timezone
from typing import TYPE_CHECKING, Anyfrom scrapy import Spider, signalsif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollector[docs]class CoreStats:def __init__(self, stats: StatsCollector):self.stats: StatsCollector = statsself.start_time: datetime | None = None@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:assert crawler.statso = cls(crawler.stats)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)crawler.signals.connect(o.response_received, signal=signals.response_received)return odef spider_opened(self, spider: Spider) -> None:self.start_time = datetime.now(tz=timezone.utc)self.stats.set_value("start_time", self.start_time, spider=spider)def spider_closed(self, spider: Spider, reason: str) -> None:assert self.start_time is not Nonefinish_time = datetime.now(tz=timezone.utc)elapsed_time = finish_time - self.start_timeelapsed_time_seconds = elapsed_time.total_seconds()self.stats.set_value("elapsed_time_seconds", elapsed_time_seconds, spider=spider)self.stats.set_value("finish_time", finish_time, spider=spider)self.stats.set_value("finish_reason", reason, spider=spider)def item_scraped(self, item: Any, spider: Spider) -> None:self.stats.inc_value("item_scraped_count", spider=spider)def response_received(self, spider: Spider) -> None:self.stats.inc_value("response_received_count", spider=spider)def item_dropped(self, item: Any, spider: Spider, exception: BaseException) -> None:reason = exception.__class__.__name__self.stats.inc_value("item_dropped_count", spider=spider)self.stats.inc_value(f"item_dropped_reasons_count/{reason}", spider=spider)
监听spider_opened、spider_closed、item_scraped、item_dropped、response_received信号,进行数据统计。
2、scrapy.extensions.telnet.TelnetConsole
"""
Scrapy Telnet Console extensionSee documentation in docs/topics/telnetconsole.rst
"""from __future__ import annotationsimport binascii
import logging
import os
import pprint
from typing import TYPE_CHECKING, Anyfrom twisted.internet import protocol
from twisted.internet.tcp import Portfrom scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.decorators import defers
from scrapy.utils.engine import print_engine_status
from scrapy.utils.reactor import listen_tcp
from scrapy.utils.trackref import print_live_refsif TYPE_CHECKING:from twisted.conch import telnet# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerlogger = logging.getLogger(__name__)# signal to update telnet variables
# args: telnet_vars
update_telnet_vars = object()[docs]class TelnetConsole(protocol.ServerFactory):def __init__(self, crawler: Crawler):if not crawler.settings.getbool("TELNETCONSOLE_ENABLED"):raise NotConfiguredself.crawler: Crawler = crawlerself.noisy: bool = Falseself.portrange: list[int] = [int(x) for x in crawler.settings.getlist("TELNETCONSOLE_PORT")]self.host: str = crawler.settings["TELNETCONSOLE_HOST"]self.username: str = crawler.settings["TELNETCONSOLE_USERNAME"]self.password: str = crawler.settings["TELNETCONSOLE_PASSWORD"]if not self.password:self.password = binascii.hexlify(os.urandom(8)).decode("utf8")logger.info("Telnet Password: %s", self.password)self.crawler.signals.connect(self.start_listening, signals.engine_started)self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def start_listening(self) -> None:self.port: Port = listen_tcp(self.portrange, self.host, self)h = self.port.getHost()logger.info("Telnet console listening on %(host)s:%(port)d",{"host": h.host, "port": h.port},extra={"crawler": self.crawler},)def stop_listening(self) -> None:self.port.stopListening()def protocol(self) -> telnet.TelnetTransport: # type: ignore[override]# these import twisted.internet.reactorfrom twisted.conch import manhole, telnetfrom twisted.conch.insults import insultsclass Portal:"""An implementation of IPortal"""@defersdef login(self_, credentials, mind, *interfaces):if not (credentials.username == self.username.encode("utf8")and credentials.checkPassword(self.password.encode("utf8"))):raise ValueError("Invalid credentials")protocol = telnet.TelnetBootstrapProtocol(insults.ServerProtocol, manhole.Manhole, self._get_telnet_vars())return (interfaces[0], protocol, lambda: None)return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol, Portal())def _get_telnet_vars(self) -> dict[str, Any]:# Note: if you add entries here also update topics/telnetconsole.rstassert self.crawler.enginetelnet_vars: dict[str, Any] = {"engine": self.crawler.engine,"spider": self.crawler.engine.spider,"slot": self.crawler.engine.slot,"crawler": self.crawler,"extensions": self.crawler.extensions,"stats": self.crawler.stats,"settings": self.crawler.settings,"est": lambda: print_engine_status(self.crawler.engine),"p": pprint.pprint,"prefs": print_live_refs,"help": "This is Scrapy telnet console. For more info see: ""https://docs.scrapy.org/en/latest/topics/telnetconsole.html",}self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)return telnet_vars
通过telnet可以执行本地的变量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。
3、scrapy.extensions.memusage.MemoryUsage 内存利用
"""
MemoryUsage extensionSee documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport logging
import socket
import sys
from importlib import import_module
from pprint import pformat
from typing import TYPE_CHECKINGfrom twisted.internet import taskfrom scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.mail import MailSender
from scrapy.utils.engine import get_engine_statusif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerlogger = logging.getLogger(__name__)[docs]class MemoryUsage:def __init__(self, crawler: Crawler):if not crawler.settings.getbool("MEMUSAGE_ENABLED"):raise NotConfiguredtry:# stdlib's resource module is only available on unix platforms.self.resource = import_module("resource")except ImportError:raise NotConfiguredself.crawler: Crawler = crawlerself.warned: bool = Falseself.notify_mails: list[str] = crawler.settings.getlist("MEMUSAGE_NOTIFY_MAIL")self.limit: int = crawler.settings.getint("MEMUSAGE_LIMIT_MB") * 1024 * 1024self.warning: int = crawler.settings.getint("MEMUSAGE_WARNING_MB") * 1024 * 1024self.check_interval: float = crawler.settings.getfloat("MEMUSAGE_CHECK_INTERVAL_SECONDS")self.mail: MailSender = MailSender.from_crawler(crawler)crawler.signals.connect(self.engine_started, signal=signals.engine_started)crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def get_virtual_size(self) -> int:size: int = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrssif sys.platform != "darwin":# on macOS ru_maxrss is in bytes, on Linux it is in KBsize *= 1024return sizedef engine_started(self) -> None:assert self.crawler.statsself.crawler.stats.set_value("memusage/startup", self.get_virtual_size())self.tasks: list[task.LoopingCall] = []tsk = task.LoopingCall(self.update)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.limit:tsk = task.LoopingCall(self._check_limit)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)if self.warning:tsk = task.LoopingCall(self._check_warning)self.tasks.append(tsk)tsk.start(self.check_interval, now=True)def engine_stopped(self) -> None:for tsk in self.tasks:if tsk.running:tsk.stop()def update(self) -> None:assert self.crawler.statsself.crawler.stats.max_value("memusage/max", self.get_virtual_size())def _check_limit(self) -> None:assert self.crawler.engineassert self.crawler.statspeak_mem_usage = self.get_virtual_size()if peak_mem_usage > self.limit:self.crawler.stats.set_value("memusage/limit_reached", 1)mem = self.limit / 1024 / 1024logger.error("Memory usage exceeded %(memusage)dMiB. Shutting down Scrapy...",{"memusage": mem},extra={"crawler": self.crawler},)if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} terminated: "f"memory usage exceeded {mem}MiB at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value("memusage/limit_notified", 1)if self.crawler.engine.spider is not None:self.crawler.engine.close_spider(self.crawler.engine.spider, "memusage_exceeded")else:self.crawler.stop()else:logger.info("Peak memory usage is %(virtualsize)dMiB",{"virtualsize": peak_mem_usage / 1024 / 1024},)def _check_warning(self) -> None:if self.warned: # warn only oncereturnassert self.crawler.statsif self.get_virtual_size() > self.warning:self.crawler.stats.set_value("memusage/warning_reached", 1)mem = self.warning / 1024 / 1024logger.warning("Memory usage reached %(memusage)dMiB",{"memusage": mem},extra={"crawler": self.crawler},)if self.notify_mails:subj = (f"{self.crawler.settings['BOT_NAME']} warning: "f"memory usage reached {mem}MiB at {socket.gethostname()}")self._send_report(self.notify_mails, subj)self.crawler.stats.set_value("memusage/warning_notified", 1)self.warned = Truedef _send_report(self, rcpts: list[str], subject: str) -> None:"""send notification mail with some additional useful info"""assert self.crawler.engineassert self.crawler.statsstats = self.crawler.statss = f"Memory usage at engine startup : {stats.get_value('memusage/startup') / 1024 / 1024}M\r\n"s += f"Maximum memory usage : {stats.get_value('memusage/max') / 1024 / 1024}M\r\n"s += f"Current memory usage : {self.get_virtual_size() / 1024 / 1024}M\r\n"s += ("ENGINE STATUS ------------------------------------------------------- \r\n")s += "\r\n"s += pformat(get_engine_status(self.crawler.engine))s += "\r\n"self.mail.send(rcpts, subject, s)
该功能执行需要部署在linux上,可以配置预警监控、发送预警邮件等,
配置预警邮件参数:
MAIL_HOST = 'localhost' # 邮件服务器
MAIL_PORT = 25 # 邮箱端口号
MAIL_FROM = 'scrapy@localhost' # 邮箱名称
MAIL_PASS = None # 邮箱密码
MAIL_USER = None # 邮箱地址配置预警监控的参数如下:
MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0 # 每60s检测一次
MEMUSAGE_ENABLED = True # 开启预警监控
MEMUSAGE_LIMIT_MB = 0 # 预警限制使用内存
MEMUSAGE_NOTIFY_MAIL = [] # 预警邮件接收邮箱
MEMUSAGE_WARNING_MB = 0 # 预警警告信息内存大小
当使用内存查过limit和waring内存时,会发送对应的邮件提醒。
4、scrapy.extensions.memdebug.MemoryDebugger
"""
MemoryDebugger extensionSee documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport gc
from typing import TYPE_CHECKINGfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.trackref import live_refsif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollector[docs]class MemoryDebugger:def __init__(self, stats: StatsCollector):self.stats: StatsCollector = stats@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:if not crawler.settings.getbool("MEMDEBUG_ENABLED"):raise NotConfiguredassert crawler.statso = cls(crawler.stats)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_closed(self, spider: Spider, reason: str) -> None:gc.collect()self.stats.set_value("memdebug/gc_garbage_count", len(gc.garbage), spider=spider)for cls, wdict in live_refs.items():if not wdict:continueself.stats.set_value(f"memdebug/live_refs/{cls.__name__}", len(wdict), spider=spider)
参数
MEMDEBUG_ENABLED = False # enable memory debugging
MEMDEBUG_NOTIFY = [] # send memory debugging report by mail at engine shutdown
其中MEMDEBUG_NOTITY目前项目中未使用。
主要功能就是开启gc,垃圾回收,然后统计对应的信息。
5、scrapy.extensions.closespider.CloseSpider
"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.See documentation in docs/topics/extensions.rst
"""from __future__ import annotationsimport logging
from collections import defaultdict
from typing import TYPE_CHECKING, Anyfrom scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfiguredif TYPE_CHECKING:from twisted.python.failure import Failure# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.http import Responselogger = logging.getLogger(__name__)[docs]class CloseSpider:def __init__(self, crawler: Crawler):self.crawler: Crawler = crawlerself.close_on: dict[str, Any] = {"timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"),"itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"),"pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"),"errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"),"timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"),"pagecount_no_item": crawler.settings.getint("CLOSESPIDER_PAGECOUNT_NO_ITEM"),}if not any(self.close_on.values()):raise NotConfiguredself.counter: defaultdict[str, int] = defaultdict(int)if self.close_on.get("errorcount"):crawler.signals.connect(self.error_count, signal=signals.spider_error)if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"):crawler.signals.connect(self.page_count, signal=signals.response_received)if self.close_on.get("timeout"):crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"):crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)if self.close_on.get("timeout_no_item"):self.timeout_no_item: int = self.close_on["timeout_no_item"]self.items_in_period: int = 0crawler.signals.connect(self.spider_opened_no_item, signal=signals.spider_opened)crawler.signals.connect(self.item_scraped_no_item, signal=signals.item_scraped)crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:return cls(crawler)def error_count(self, failure: Failure, response: Response, spider: Spider) -> None:self.counter["errorcount"] += 1if self.counter["errorcount"] == self.close_on["errorcount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_errorcount")def page_count(self, response: Response, request: Request, spider: Spider) -> None:self.counter["pagecount"] += 1self.counter["pagecount_since_last_item"] += 1if self.counter["pagecount"] == self.close_on["pagecount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_pagecount")returnif self.close_on["pagecount_no_item"] and (self.counter["pagecount_since_last_item"]>= self.close_on["pagecount_no_item"]):assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_pagecount_no_item")def spider_opened(self, spider: Spider) -> None:from twisted.internet import reactorassert self.crawler.engineself.task = reactor.callLater(self.close_on["timeout"],self.crawler.engine.close_spider,spider,reason="closespider_timeout",)def item_scraped(self, item: Any, spider: Spider) -> None:self.counter["itemcount"] += 1self.counter["pagecount_since_last_item"] = 0if self.counter["itemcount"] == self.close_on["itemcount"]:assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_itemcount")def spider_closed(self, spider: Spider) -> None:task = getattr(self, "task", None)if task and task.active():task.cancel()task_no_item = getattr(self, "task_no_item", None)if task_no_item and task_no_item.running:task_no_item.stop()def spider_opened_no_item(self, spider: Spider) -> None:from twisted.internet import taskself.task_no_item = task.LoopingCall(self._count_items_produced, spider)self.task_no_item.start(self.timeout_no_item, now=False)logger.info(f"Spider will stop when no items are produced after "f"{self.timeout_no_item} seconds.")def item_scraped_no_item(self, item: Any, spider: Spider) -> None:self.items_in_period += 1def _count_items_produced(self, spider: Spider) -> None:if self.items_in_period >= 1:self.items_in_period = 0else:logger.info(f"Closing spider since no items were produced in the last "f"{self.timeout_no_item} seconds.")assert self.crawler.engineself.crawler.engine.close_spider(spider, "closespider_timeout_no_item")
参数
CLOSESPIDER_TIMEOUT = 0 # download超时次数超过该数值时关系Spider
CLOSESPIDER_PAGECOUNT = 0 # download page个数超过该数值时关系Spider
CLOSESPIDER_ITEMCOUNT = 0 # pipeline item个数超过该数值时关系Spider
CLOSESPIDER_ERRORCOUNT = 0 # download 错误次数超过该数值时关系Spider
主要功能是控制超时个数、page个数、item个数、错误次数等。
6、scrapy.extensions.logstats.LogStats
from __future__ import annotationsimport logging
from typing import TYPE_CHECKINGfrom twisted.internet import taskfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfiguredif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawlerfrom scrapy.statscollectors import StatsCollectorlogger = logging.getLogger(__name__)[docs]class LogStats:"""Log basic scraping stats periodically like:* RPM - Requests per Minute* IPM - Items per Minute"""def __init__(self, stats: StatsCollector, interval: float = 60.0):self.stats: StatsCollector = statsself.interval: float = intervalself.multiplier: float = 60.0 / self.intervalself.task: task.LoopingCall | None = None@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:interval: float = crawler.settings.getfloat("LOGSTATS_INTERVAL")if not interval:raise NotConfiguredassert crawler.statso = cls(crawler.stats, interval)crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)return odef spider_opened(self, spider: Spider) -> None:self.pagesprev: int = 0self.itemsprev: int = 0self.task = task.LoopingCall(self.log, spider)self.task.start(self.interval)def log(self, spider: Spider) -> None:self.calculate_stats()msg = ("Crawled %(pages)d pages (at %(pagerate)d pages/min), ""scraped %(items)d items (at %(itemrate)d items/min)")log_args = {"pages": self.pages,"pagerate": self.prate,"items": self.items,"itemrate": self.irate,}logger.info(msg, log_args, extra={"spider": spider})def calculate_stats(self) -> None:self.items: int = self.stats.get_value("item_scraped_count", 0)self.pages: int = self.stats.get_value("response_received_count", 0)self.irate: float = (self.items - self.itemsprev) * self.multiplierself.prate: float = (self.pages - self.pagesprev) * self.multiplierself.pagesprev, self.itemsprev = self.pages, self.itemsdef spider_closed(self, spider: Spider, reason: str) -> None:if self.task and self.task.running:self.task.stop()rpm_final, ipm_final = self.calculate_final_stats(spider)self.stats.set_value("responses_per_minute", rpm_final)self.stats.set_value("items_per_minute", ipm_final)def calculate_final_stats(self, spider: Spider) -> tuple[None, None] | tuple[float, float]:start_time = self.stats.get_value("start_time")finished_time = self.stats.get_value("finished_time")if not start_time or not finished_time:return None, Nonemins_elapsed = (finished_time - start_time).seconds / 60items = self.stats.get_value("item_scraped_count", 0)pages = self.stats.get_value("response_received_count", 0)return (pages / mins_elapsed), (items / mins_elapsed)
参数
LOGSTATS_INTERVAL = 60.0 # 每60s统计一次数据 当为0时,则不进行统计
主要统计page、item的个数等信息,从而计算频率。
7、scrapy.extensions.spiderstate.SpiderState
from __future__ import annotationsimport pickle # nosec
from pathlib import Path
from typing import TYPE_CHECKINGfrom scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.job import job_dirif TYPE_CHECKING:# typing.Self requires Python 3.11from typing_extensions import Selffrom scrapy.crawler import Crawler[docs]class SpiderState:"""Store and load spider state during a scraping job"""def __init__(self, jobdir: str | None = None):self.jobdir: str | None = jobdir@classmethoddef from_crawler(cls, crawler: Crawler) -> Self:jobdir = job_dir(crawler.settings)if not jobdir:raise NotConfiguredobj = cls(jobdir)crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)return objdef spider_closed(self, spider: Spider) -> None:if self.jobdir:with Path(self.statefn).open("wb") as f:assert hasattr(spider, "state") # set in spider_openedpickle.dump(spider.state, f, protocol=4)def spider_opened(self, spider: Spider) -> None:if self.jobdir and Path(self.statefn).exists():with Path(self.statefn).open("rb") as f:spider.state = pickle.load(f) # type: ignore[attr-defined] # nosecelse:spider.state = {} # type: ignore[attr-defined]@propertydef statefn(self) -> str:assert self.jobdirreturn str(Path(self.jobdir, "spider.state"))
参数
JOBDIR='' # 项目spider state保存地址
配置JOBDIR时,会自动创建文件夹然后保存spider state到文件夹内。默认是不配置的。
8、scrapy.extensions.throttle.AutoThrottle
class AutoThrottle:def __init__(self, crawler):self.crawler = crawlerif not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):raise NotConfiguredself.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)@classmethoddef from_crawler(cls, crawler):return cls(crawler)def _spider_opened(self, spider):self.mindelay = self._min_delay(spider)self.maxdelay = self._max_delay(spider)spider.download_delay = self._start_delay(spider)def _min_delay(self, spider):s = self.crawler.settingsreturn getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))def _max_delay(self, spider):return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')def _start_delay(self, spider):return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))def _response_downloaded(self, response, request, spider):key, slot = self._get_slot(request, spider)latency = request.meta.get('download_latency')if latency is None or slot is None:returnolddelay = slot.delayself._adjust_delay(slot, latency, response)if self.debug:diff = slot.delay - olddelaysize = len(response.body)conc = len(slot.transferring)logger.info("slot: %(slot)s | conc:%(concurrency)2d | ""delay:%(delay)5d ms (%(delaydiff)+d) | ""latency:%(latency)5d ms | size:%(size)6d bytes",{'slot': key, 'concurrency': conc,'delay': slot.delay * 1000, 'delaydiff': diff * 1000,'latency': latency * 1000, 'size': size},extra={'spider': spider})def _get_slot(self, request, spider):key = request.meta.get('download_slot')return key, self.crawler.engine.downloader.slots.get(key)def _adjust_delay(self, slot, latency, response):"""Define delay adjustment policy"""# If a server needs `latency` seconds to respond then# we should send a request each `latency/N` seconds# to have N requests processed in paralleltarget_delay = latency / self.target_concurrency# Adjust the delay to make it closer to target_delaynew_delay = (slot.delay + target_delay) / 2.0# If target delay is bigger than old delay, then use it instead of mean.# It works better with problematic sites.new_delay = max(target_delay, new_delay)# Make sure self.mindelay <= new_delay <= self.max_delaynew_delay = min(max(self.mindelay, new_delay), self.maxdelay)# Dont adjust delay if response status != 200 and new delay is smaller# than old one, as error pages (and redirections) are usually small and# so tend to reduce latency, thus provoking a positive feedback by# reducing delay instead of increase.if response.status != 200 and new_delay <= slot.delay:returnslot.delay = new_delay
参数
AUTOTHROTTLE_ENABLED = False # 是否开启自适应下载延迟
AUTOTHROTTLE_DEBUG = False # 是否开启自适应DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延迟60s
AUTOTHROTTLE_START_DELAY = 5.0 # 开始延迟5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自动调整精度为1s该功能默认不开启。
参考链接
Scrapy 源码分析 4 extensions middlewares详解_scrapy.extensions.logstats-CSDN博客
相关文章:

Python - 爬虫;Scrapy框架之插件Extensions(四)
阅读本文前先参考 https://blog.csdn.net/MinggeQingchun/article/details/145904572 在 Scrapy 中,扩展(Extensions)是一种插件,允许你添加额外的功能到你的爬虫项目中。这些扩展可以在项目的不同阶段执行,比如启动…...

Spark实战能力测评模拟题精析【模拟考】
1.println(Array(1,2,3,4,5).filter(_%20).toList() 输出结果是(B) A. 2 4 B. List(2,4) C. List(1,3,5) D. 1 3 5 2.println(Array("tom","team","pom") .filter(_.matches("")).toList) 输出结果为(List(tom,…...

【OSG学习笔记】Day 15: 路径动画与相机漫游
本章来学习下漫游相机。 路径动画与相机漫游 本届内容比较简单,其实就是实现物体的运动和相机的运动 当然这两个要一起执行。 贝塞尔曲线 贝塞尔曲线(Bzier curve)是一种在计算机图形学、动画制作、工业设计等领域广泛应用的参数曲线&am…...

PostgreSQL(PostGIS)触发器+坐标转换案例
需求,只录入一份坐标参考为4326的数据,但是发布的数据要求坐标必须是3857 对这种需求可以利用数据库触发器实现数据的同步 步骤: 1. 使用ArcGIS Pro创建一个名字为testfc_4326的图层,坐标参考为4326 2. 使用Pro再创建一个名字…...

Constraints and Triggers
目录 Kinds of Constraints Single-Attribute Keys Multiattribute Key Foreign Keys Expressing Foreign Keys Enforcing Foreign-Key Constraints Actions Taken Attribute-Based Checks Timing of Checks Tuple-Based Checks Assertions Timing of Assertion Ch…...
基于windows系统的netcore架构与SqlServer数据库,实现双机热备。
以下是基于 SQL Server Always On 可用性组 和 故障转移群集 的详细配置步骤,用于实现双机热备。 步骤 1:准备环境 1.1 硬件和软件准备 两台服务器:分别作为主服务器和备用服务器。SQL Server版本:确保两台服务器上安装的SQL S…...
【转bin】EXCEL数据转bin
如果DEC2BIN函数的默认设置无法满足需求(它最多只能处理10位的二进制转换),可以通过VBA宏方法来处理较大数的二进制转换并提取特定位置的数字: 十进制转二进制(不限位宽) 1、打开VBA编辑器(Al…...

BERT:让AI真正“读懂”语言的革命
BERT:让AI真正“读懂”语言的革命 ——图解谷歌神作《BERT: Pre-training of Deep Bidirectional Transformers》 2018年,谷歌AI团队扔出一篇核弹级论文,引爆了整个NLP领域。这个叫BERT的模型在11项任务中屠榜,甚至超越人类表现…...
【计算机组成原理】SPOOLing技术
SPOOLing技术 关键点内容核心思想通过输入/输出井虚拟化独占设备,实现共享,即让多个作业共享一台独占设备依赖条件1. 外存(井文件)2. 多道程序设计虚拟实现多道程序技术磁盘缓冲数据流方向输入设备 → 输入井 → CPU → 输出井 →…...

冷雨泉教授团队:新型视觉驱动智能假肢手,拟人化抓握技术突破,助力截肢者重获生活自信
研究背景:日常生活中,健康人依靠手完成对物体的操作。对于手部截肢患者,手部的缺失导致他们难以有效地操作物体,进而影响正常的日常生活。拥有一个能够实现拟人地自然抓取多种日常物体的五指动力假手是手部截肢患者的夙愿…...
CanvasGroup篇
🎯 Unity UI 性能优化终极指南 — CanvasGroup篇 🧩 什么是 CanvasGroup? CanvasGroup 是UGUI的透明控制器,用于整体控制一组UI元素的: 可见性 (alpha)交互性 (interactable)射线检测 (blocksRaycasts) 🎯…...
[Java 基础]银行账户程序
编写一个 Java 控制台应用程序,模拟一个简单的银行账户。该程序应允许用户执行以下操作: 查询账户余额。 账户初始余额设置为 1000.0 元。向账户存入资金。 用户可以输入存款金额,程序应更新账户余额。存款金额必须为正数。从账户提取资金。…...
2025.6.4总结
工作:今天效率比较高,早上回归4个问题,下午找了3个bug,晚上二刷了科目一(贪吃蛇系统),写了四个点,唯一没达标的就是两自动化没完成。美中不足的是电脑上下载不了PC版的番茄工作软件。…...
将音频数据累积到缓冲区,达到阈值时触发处理
实现了音频处理中的 AEC(声学回声消除)和 AES(音频增强)功能,其核心功能是: 数据缓冲管理:将输入的麦克风和扬声器音频数据块累积到缓冲区中块处理机制:当缓冲区填满预设大小&#…...

pikachu靶场通关笔记14 XSS关卡10-XSS之js输出(五种方法渗透)
目录 一、源码分析 1、进入靶场 2、代码审计 二、渗透实战 1、根据提示输入tmac 2、XSS探测 3、注入Payload1 4、注入Payload2 5、注入Payload3 6、注入Payload4 7、注入Payload5 本系列为通过《pikachu靶场通关笔记》的XSS关卡(共10关)渗透集合&#x…...
5.Promise,async,await概念(1)
Promise 是 JavaScript 原生提供的异步处理机制,而 async 和 await 是基于 Promise 的语法糖,由 JavaScript 语言和其运行时环境(如浏览器、Node.js)支持,用于更清晰地编写异步代码,从而避免回调地狱。 Pr…...

李沐-动手学深度学习:RNN
1.RNN从零开始实现 import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l#8.3.4节 #batch_size:每个小批量中子序列样本的数目,num_steps:每个子序列中预定义的时间步数 #loa…...
Windows系统下npm报错node-gyp configure got “gyp ERR“解决方法
感谢原博主,此文参考网址:https://zhuanlan.zhihu.com/p/398279220 确保已经安装node.js (官方网址:https://nodejs.org/zh-cn/download) 首先在命令窗口执行命令安装windows-build-tools: npm install -…...
Elasticsearch中的文档(Document)介绍
在Elasticsearch(ES)中,文档(Document)是最基本的数据单元,类似于关系型数据库中的“行”。它以JSON格式存储,包含多个字段(Field),每个字段可以是不同类型(如文本、数值、日期等)。文档是索引(Index)的组成部分,通过唯一ID标识,并支持动态映射(Dynamic Mappi…...
15个基于场景的 DevOps 面试问题及答案
第一部分:持续集成和部署 (CI/CD) 场景 1:构建中断 “您的 CI 流水线突然出现‘找不到依赖项’的错误。您会如何处理这个问题?” 回答:首先,我会检查是否有新的依赖项被添加到需求文件中,但这些依赖项并未包含在需求文件中。我还会验证构建服务器是否可以访问互联网来下…...
今日主题二分查找(寻找峰值 力扣162)
峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums,找到峰值元素并返回其索引。数组可能包含多个峰值,在这种情况下,返回 任何一个峰值 所在位置即可。 你可以假设 nums[-1] nums[n] -∞ 。 你必须实现时间复杂度为 O(…...

【教学类-36-10】20250531蝴蝶图案描边,最适合大小(一页1图1图、2图图案不同、2图图案相同对称)
背景说明: 之前做了动物头像扇子(描边20),并制作成一页一套图案对称两张 【教学类-36-09】20250526动物头像扇子的描边(通义万相)对称图40张,根据图片长宽,自动旋转图片,最大化图片-CSDN博客文章浏览阅读1k次,点赞37次,收藏6次。【教学类-36-09】20250526动物头像…...

高效DBA的日常运维主题沙龙
2024年11月10日,在宁波组织了高效DBA的日常运维沙龙活动,大概有20人左右现场参加。会议的主题为: 目标: 1、识别高频低效操作并制定自动化方案 2、建立关键运维指标健康度体系 3、输出可立即落地的优化清单 会议议程 一、效能瓶…...

AAAI 2025论文分享│STD-PLM:基于预训练语言模型的时空数据预测与补全方法
本文详细介绍了一篇发表于人工智能顶级会议AAAI 2025的论文《STD-PLM: Understanding Both Spatial and Temporal Properties of Spatial-Temporal Data with PLM》。该论文提出了一种基于预训练语言模型(Pre-trained Language Model,PLM)的…...

Ethernet/IP转DeviceNet网关:驱动大型矿山自动化升级的核心纽带
在大型矿山自动化系统中,如何高效整合新老设备、打通数据孤岛、实现统一控制,是提升效率与安全的关键挑战。JH-EIP-DVN疆鸿智能EtherNet/IP转DeviceNet网关,正是解决这一难题的核心桥梁,为矿山各环节注入强劲连接力: …...
Android 11以上App主动连接WIFI的完整方案
早期Android版本App内连接指定的WIFI还是比较简单的,但是随着Android版本的提升,限制也越来越多。以下是一套完整的Android 11以上的WIFI应用内主动连接方案。 第一步:添加到建议连接: val wifiManager getSystemService(WIFI_…...

[蓝桥杯]模型染色
模型染色 题目描述 在电影《超能陆战队》中,小宏可以使用他的微型机器人组合成各种各样的形状。 现在他用他的微型机器人拼成了一个大玩具给小朋友们玩。为了更加美观,他决定给玩具染色。 小宏的玩具由 nn 个球型的端点和 mm 段连接这些端点之间的边…...
力扣上C语言编程题
一. 简介 本文简单记录一下力扣上 C语言编程题。作为自己做题笔记。 二. 力扣上 C 语言编程题 1. 从数组中找到两个元素之和,等于一个 target目标值 具体题目说明:给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为…...

卡西欧模拟器:Windows端功能强大的计算器
引言 大家还记得初中高中时期用的计算器吗?今天给大家分享的就是一款windows端的卡西欧计算器。 软件介绍 大家好,我是逍遥小欢。 CASIO fx-9860G是一款功能强大的图形计算器,适用于数学、科学和工程计算。以下是其主要功能和特点的详细介…...
鸿蒙OSUniApp结合机器学习打造智能图像分类应用:HarmonyOS实践指南#三方框架 #Uniapp
UniApp结合机器学习打造智能图像分类应用:HarmonyOS实践指南 引言 在移动应用开发领域,图像分类是一个既经典又充满挑战的任务。随着机器学习技术的发展,我们现在可以在移动端实现高效的图像分类功能。本文将详细介绍如何使用UniApp结合Ten…...