python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)
异步切片下载二进制文件并上传桶删除本地文件
import json
import os
import asyncio
from urllib import parseimport aiohttp
import aioredis
from motor.motor_asyncio import AsyncIOMotorClient
from retrying import retry
from minio import Minio
from minio.error import S3Error
import loguruclass Async_download:def __init__(self, sema_number=20, redis_address='redis://ip:6379/db', redis_password='passwd'):self.__sema_number = sema_numberself.__sema = Noneself.redis_address = redis_addressself.redis_password = redis_passwordself.minio_config = {"endpoint": "ip:port","access_key": "******","secret_key": "******","secure": False}self.mongo_config = {"uri": "mongodb://{0}:{1}@ip:27017".format(parse.quote_plus("user"),parse.quote_plus("passwd")),"db": "******","collection": "******",}@classmethoddef _mkdir(cls, file_path, full_path):if os.path.exists(file_path):return Truetry:os.makedirs(full_path)except Exception as e:passreturn Falsedef __init_check(self, file_path: str):full_path, file_name = file_path.rsplit('/', 1)file_size = os.path.getsize(file_path) if self._mkdir(file_path, full_path) else 0return file_name, file_size@classmethoddef __sync_save_local(cls, r_headers, results, file_path):done, padding = resultsfor d in done:for index, value in d.result().items():r_headers[index] = valuewith open(file_path, 'ab') as f:for _, value in r_headers.items():f.write(value)return True@classmethoddef __generate_headers(cls, headers, file_size, first_byte):r_headers = {}index = 0if first_byte > 51200000:byte = 2048000 # 2M 为一片else:byte = 1024000 # 1M 为一片while True:file_size_two = file_size + byteif file_size_two >= first_byte:r_headers[index] = {"Range": f"bytes={file_size}-{first_byte}"}breakr_headers[index] = {"Range": f"bytes={file_size}-{file_size_two - 1}"}index += 1file_size = file_size_twofor key in r_headers:r_headers[key].update(headers)return r_headers@retry(stop_max_attempt_number=3)async def __download_one(self, session, method, url, r_headers, **kwargs):index, headers = r_headersasync with self.__sema:async with session.request(method, url, headers=headers, **kwargs) as response:binary = await response.content.read()return {index: binary}async def __async_section_download(self, session, method, url, r_headers, **kwargs):tasks = [asyncio.create_task(self.__download_one(session, method, url, (key, r_headers[key]), **kwargs)) for key inr_headers]return await asyncio.wait(tasks)@classmethodasync def __get_content_length(cls, session, method, url, headers, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:return response.headers.get('Content-Length') or response.headers.get('content-length') or 0@classmethodasync def __sync_download(cls, session, method, url, headers, file_path, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:with open(file_path, 'wb') as f:binary = await response.content.read()f.write(binary)async def __async_download_main(self, method, url, headers, file_path, **kwargs):file_name, file_size = self.__init_check(file_path)self.__sema = asyncio.Semaphore(self.__sema_number)async with aiohttp.ClientSession() as session:content_length = await self.__get_content_length(session, method, url, headers, **kwargs)if content_length and content_length.isdigit():content_length = int(content_length)if file_size >= content_length:await self.__upload_to_minio(file_path, file_name) # Upload to MinIOawait self.__update_mongo_status(file_name, True) # Update MongoDB statusos.remove(file_path) # Delete local filereturn True, file_pathr_headers = self.__generate_headers(headers, file_size, content_length)results = await self.__async_section_download(session, method, url, r_headers, **kwargs)self.__sync_save_local(r_headers, results, file_path)else:await self.__sync_download(session, method, url, headers, file_path, **kwargs)if os.path.getsize(file_path) >= int(content_length):await self.__upload_to_minio(file_path, file_name) # Upload to MinIOawait self.__update_mongo_status(file_name, True) # Update MongoDB statusos.remove(file_path) # Delete local filereturn True, file_pathreturn False, file_pathasync def __get_task_from_redis(self):async with aioredis.from_url(self.redis_address, password=self.redis_password) as redis:task = await redis.lpop('file_file_all')return taskasync def __process_redis_tasks(self):while True:task_info = await self.__get_task_from_redis()if task_info is None:breaktask = json.loads(task_info)try:method = 'get'url = task["file_link"]file_path = './{}'.format(task["file_name"])headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'}if url.startswith('http') or url.startswith('https'):try:ddd = url.split('http')if len(ddd) != 0:url = 'http' + ddd[-1]except:continue# Perform download for the current taskawait self.__async_download_main(method, url, headers, file_path)except Exception as e:loguru.logger.error("Error processing Redis task:", e)async def __upload_to_minio(self, file_path, object_name):"""上传minio"""try:minioClient = Minio(**self.minio_config)check_bucket = minioClient.bucket_exists("******")if not check_bucket:minioClient.make_bucket("******")loguru.logger.info("start upload file to MinIO")minioClient.fput_object(bucket_name="******", object_name=object_name, file_path=file_path)loguru.logger.info("file {0} is successfully uploaded to MinIO".format(object_name))except FileNotFoundError as err:loguru.logger.info('*' * 10)loguru.logger.error('MinIO upload failed: ' + str(err))except S3Error as err:loguru.logger.error("MinIO upload failed:", err)async def __update_mongo_status(self, file_name, status):"""更新mongo采集状态"""try:mongo_uri = self.mongo_config["uri"]db_name = self.mongo_config["db"]collection_name = self.mongo_config["collection"]client = AsyncIOMotorClient(mongo_uri)db = client.get_database(db_name)collection = db.get_collection(collection_name)await collection.update_one({"file_name": file_name}, {"$set": {"status": status}})except Exception as e:loguru.logger.error("MongoDB update failed:", e)async def start(self):await self.__process_redis_tasks()loguru.logger.add("download_file_output.log", rotation="500 MB", level="DEBUG")
if __name__ == '__main__':as_dw = Async_download(20)asyncio.run(as_dw.start())
部分代码来源于y小白的笔记
相关文章:

python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)
异步切片下载二进制文件并上传桶删除本地文件 import json import os import asyncio from urllib import parseimport aiohttp import aioredis from motor.motor_asyncio import AsyncIOMotorClient from retrying import retry from minio import Minio from minio.error im…...

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(10)-Fiddler如何设置捕获Firefox浏览器的Https会话
1.简介 经过上一篇对Fiddler的配置后,绝大多数的Https的会话,我们可以成功捕获抓取到,但是有些版本的Firefox浏览器仍然是捕获不到其的Https会话,需要我们更进一步的配置才能捕获到会话进行抓包。 2.宏哥环境 1.宏哥的环境是Wi…...

阿里云云原生弹性方案:用弹性解决集群资源利用率难题
作者:赫曦 随着上云的认知更加普遍,我们发现除了以往占大部分的互联网类型的客户,一些传统的企业,一些制造类的和工业型企业客户也都开始使用云原生的方式去做 IT 架构的转型,提高集群资源使用率也成为企业上云的一致…...

Spring-BeanPostProcessor PostConstruct init InitializingBean 执行顺序
执行顺序探究 新建一个对象用于测试 Component public class Student implements InitializingBean {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}pu…...

【算法】递归
递归 递归初始递归:数列求和递归的应用:任意进制转换递归深度限制递归可视化:分形树递归可视化:谢尔宾斯基Sierpinski三角形递归的应用:汉诺塔递归的应用:探索迷宫 分治策略和递归优化问题兑换最少个数硬币…...

DC-1靶机刷题记录
靶机下载地址: 链接:https://pan.baidu.com/s/1GX7qOamdNx01622EYUBSow?pwd9nyo 提取码:9nyo 参考答案: https://c3ting.com/archives/kai-qi-vulnhnbshua-tiDC-1.pdf【【基础向】超详解vulnhub靶场DC-1】 https://www.bilibi…...

rust跟我学七:获取外网IP地址
图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…...

华为:交换机忘记console密码重置
一、背景 许多旧项目经过长时间使用后,因为没有特定的管理运维人员,初始对接人也将初始账号密码等重要信息丢失,现需要进后台查看配置或更改网络配置,需重置密码 二、重置密码,不重置设备方法 1、使用console插入交…...

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题三 模块一
竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...

Go 中 slice 的 In 功能实现探索
文章目录 遍历二分查找map key性能总结 之前在知乎看到一个问题:为什么 Golang 没有像 Python 中 in 一样的功能?于是,搜了下这个问题,发现还是有不少人有这样的疑问。 补充:本文写于 2019 年。GO 现在已经支持泛型&am…...

pyDAL一个python的ORM(终) pyDAL的一些性能优化
一、大批量插入数据 对于 大量数据插入时,虽然pyDAL也手册中有个方法:bulk_insert(),但是手册也说了,虽然方法上是一次可以多条数据,如果后端数据库是关系型数据库,他转换为SQL时它是一条一条的插入的&…...

springboot log4j配置xml实例说明
提供样本配置代码 xml <?xml version"1.0" encoding"UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!-- status log4j2内部日志级别 --> <configurat…...

VsCode重新安装需要配机的ESLint和 Prettier - Code formatter 配置
新电脑安装完Vscode后,需要装几个插件,这里记录下: {"diffEditor.ignoreTrimWhitespace": false,"files.autoSave": "afterDelay","editor.codeActionsOnSave": {"source.fixAll.eslint"…...

录屏功能怎么打开?简单操作,一学就会!
录屏功能在当今互联网时代变得越来越重要,无论是游戏录制、在线课程录制还是屏幕操作演示,录屏功能都为我们提供了便捷的解决方案。可是您知道录屏功能怎么打开吗?接下来,让我们一起探索如何在电脑上开启录屏功能,记录…...

小程序显示兼容处理,home键处理
定义: env(safe-area-inset-bottom)和env(safe-area-inset-top)是CSS中的变量,用于获取设备底部和顶部安全区域的大小 示例: padding-bottom: calc(env(safe-area-inset-bottom) 12px); /* 兼容iOS> 11.2 */安全间距类型: …...

【java八股文】之JVM基础篇
【java八股文】之JVM基础篇-CSDN博客 【java八股文】之MYSQL基础篇-CSDN博客 【java八股文】之Redis基础篇-CSDN博客 【java八股文】之Spring系列篇-CSDN博客 【java八股文】之分布式系列篇-CSDN博客 【java八股文】之多线程篇-CSDN博客 【java八股文】之JVM基础篇-CSDN博…...

2024美赛数学建模思路 - 案例:异常检测
文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…...

【EI会议征稿通知】2024年通信技术与软件工程国际学术会议 (CTSE 2024)
2024年通信技术与软件工程国际学术会议 (CTSE 2024) 2024 International Conference on Communication Technology and Software Engineering (CTSE 2024) 2024年通信技术与软件工程国际学术会议 (CTSE 2024)将于2024年03月15-17日在中国长沙举行。会议专注于通信技术与软件工…...

Js面试之作用域与闭包
Js面试之作用域与闭包 作用域词法作用域动态作用域 闭包闭包使用场景封装私有变量模块化开发保持变量状态异步操作 注意事项 最近在整理一些前端面试中经常被问到的问题,分为vue相关、react相关、js相关、react相关等等专题,可持续关注后续内容ÿ…...

Go 爬虫之 colly 从入门到不放弃指南
文章目录 概要介绍如何学习官方文档如何安装快速开始如何配置调试分布式代理层面执行层面存储层面存储多收集器配置优化持久化存储启用异步加快任务执行禁止或限制 KeepAlive 连接扩展总结如果想用 GO 实现爬虫能力,该如何做呢?抽时间研究了 Go 的一款爬虫框架 colly。 概要…...

Ceph分布式存储(1)
目录 一.ceph分布式存储 Ceph架构(自上往下) OSD的存储引擎: Ceph的存储过程: 二. 基于 ceph-deploy 部署 Ceph 集群 20-40节点上添加3块硬盘,一个网卡: 10节点为admin,20-40为node&…...

制造业工厂为什么要实施MES系统呢?
MES是生产管理系统,生产管理是通过对生产系统的战略计划、组织、指挥、实施、协调、控制等活动,实现系统的物质变换、产品生产、价值提升的过程。在企业的价值链中,生产经营是企业核心能力的重要组成部分。 实施MES系统的原因 MES系统是中国比…...

Python 一行命令部署http、ftp服务
Python 一行命令部署http服务 文章目录 Python 一行命令部署http服务具体操作命令如下浏览器返回下载Python 一行命令部署FTP服务 具体操作命令如下 这个比nginx相对来说更加简单,可以用于部署特殊场景时如银行等部署时,各种权限控制,内网之间…...

DBA技术栈(三):MySQL 性能影响因素
文章目录 前言一、影响MySQL性能的因素1.1 商业上的需求1.2 应用架构规划1.3 查询语句使用方式1.4 Schema的设计1.5 硬件环境 总结 前言 大部分人都一致认为一个数据库应用系统(这里的数据库应用系统概指所有使用数据库的系统)的性能瓶颈最容易出现在数…...

SpringCloud GateWay 在全局过滤器中注入OpenFeign网关后无法启动
目录 一、问题 二、原因 1、修改配置 2、添加Lazy注解在client上面 3、启动成功 一、问题 当在gateway的全局过滤器GlobalFilter中注入OpenFeign接口的时候会一直卡在路由中,但是不会进一步,导致启动未成功也未报错失败 2024-01-18 22:06:59.299 I…...

web前端项目-贪吃蛇小游戏【附源码】
web前端项目-贪吃蛇小游戏 【贪吃蛇】是一款经典的小游戏,采用HTML、CSS和JavaScript技术进行开发,玩家通过控制一条蛇在地图上移动,蛇的目的是吃掉地图上的食物,并且让自己变得更长。游戏的核心玩法是控制蛇的移动方向和长度&am…...

ICCV2023 | PTUnifier+:通过Soft Prompts(软提示)统一医学视觉语言预训练
论文标题:Towards Unifying Medical Vision-and-Language Pre-training via Soft Prompts 代码:https://github.com/zhjohnchan/ptunifier Fusion-encoder type和Dual-encoder type。前者在多模态任务中具有优势,因为模态之间有充分的相互…...

代码随想录 Leetcode459. 重复的子字符串(KMP算法)
题目: 代码(首刷看解析 KMP算法 2024年1月18日): class Solution { public:void getNext(string& s,vector<int>& next) {int j 0;next[0] j;for (int i 1; i < s.size(); i) {while (j > 0 && s…...

Rust之构建命令行程序(三):重构改进模块化和错误处理
开发环境 Windows 10Rust 1.74.1 VS Code 1.85.1 项目工程 这次创建了新的工程minigrep. 重构改进模块化和错误处理 为了改进我们的程序,我们将修复与程序结构及其处理潜在错误的方式有关的四个问题。首先,我们的main函数现在执行两项任务:解析参数和…...

广和通AI解决方案“智”赋室外机器人迈向新天地!
大模型趋势下,行业机器人将具备更完善的交互与自主能力,逐步迈向AI 2.0时代,成为人工智能技术全面爆发的重要基础。随着行业智能化,更多机器人应用将从“室内”走向“室外”,承担更多高风险、高智能工作。复杂的室外环…...