python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)
异步切片下载二进制文件并上传桶删除本地文件
import json
import os
import asyncio
from urllib import parseimport aiohttp
import aioredis
from motor.motor_asyncio import AsyncIOMotorClient
from retrying import retry
from minio import Minio
from minio.error import S3Error
import loguruclass Async_download:def __init__(self, sema_number=20, redis_address='redis://ip:6379/db', redis_password='passwd'):self.__sema_number = sema_numberself.__sema = Noneself.redis_address = redis_addressself.redis_password = redis_passwordself.minio_config = {"endpoint": "ip:port","access_key": "******","secret_key": "******","secure": False}self.mongo_config = {"uri": "mongodb://{0}:{1}@ip:27017".format(parse.quote_plus("user"),parse.quote_plus("passwd")),"db": "******","collection": "******",}@classmethoddef _mkdir(cls, file_path, full_path):if os.path.exists(file_path):return Truetry:os.makedirs(full_path)except Exception as e:passreturn Falsedef __init_check(self, file_path: str):full_path, file_name = file_path.rsplit('/', 1)file_size = os.path.getsize(file_path) if self._mkdir(file_path, full_path) else 0return file_name, file_size@classmethoddef __sync_save_local(cls, r_headers, results, file_path):done, padding = resultsfor d in done:for index, value in d.result().items():r_headers[index] = valuewith open(file_path, 'ab') as f:for _, value in r_headers.items():f.write(value)return True@classmethoddef __generate_headers(cls, headers, file_size, first_byte):r_headers = {}index = 0if first_byte > 51200000:byte = 2048000 # 2M 为一片else:byte = 1024000 # 1M 为一片while True:file_size_two = file_size + byteif file_size_two >= first_byte:r_headers[index] = {"Range": f"bytes={file_size}-{first_byte}"}breakr_headers[index] = {"Range": f"bytes={file_size}-{file_size_two - 1}"}index += 1file_size = file_size_twofor key in r_headers:r_headers[key].update(headers)return r_headers@retry(stop_max_attempt_number=3)async def __download_one(self, session, method, url, r_headers, **kwargs):index, headers = r_headersasync with self.__sema:async with session.request(method, url, headers=headers, **kwargs) as response:binary = await response.content.read()return {index: binary}async def __async_section_download(self, session, method, url, r_headers, **kwargs):tasks = [asyncio.create_task(self.__download_one(session, method, url, (key, r_headers[key]), **kwargs)) for key inr_headers]return await asyncio.wait(tasks)@classmethodasync def __get_content_length(cls, session, method, url, headers, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:return response.headers.get('Content-Length') or response.headers.get('content-length') or 0@classmethodasync def __sync_download(cls, session, method, url, headers, file_path, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:with open(file_path, 'wb') as f:binary = await response.content.read()f.write(binary)async def __async_download_main(self, method, url, headers, file_path, **kwargs):file_name, file_size = self.__init_check(file_path)self.__sema = asyncio.Semaphore(self.__sema_number)async with aiohttp.ClientSession() as session:content_length = await self.__get_content_length(session, method, url, headers, **kwargs)if content_length and content_length.isdigit():content_length = int(content_length)if file_size >= content_length:await self.__upload_to_minio(file_path, file_name) # Upload to MinIOawait self.__update_mongo_status(file_name, True) # Update MongoDB statusos.remove(file_path) # Delete local filereturn True, file_pathr_headers = self.__generate_headers(headers, file_size, content_length)results = await self.__async_section_download(session, method, url, r_headers, **kwargs)self.__sync_save_local(r_headers, results, file_path)else:await self.__sync_download(session, method, url, headers, file_path, **kwargs)if os.path.getsize(file_path) >= int(content_length):await self.__upload_to_minio(file_path, file_name) # Upload to MinIOawait self.__update_mongo_status(file_name, True) # Update MongoDB statusos.remove(file_path) # Delete local filereturn True, file_pathreturn False, file_pathasync def __get_task_from_redis(self):async with aioredis.from_url(self.redis_address, password=self.redis_password) as redis:task = await redis.lpop('file_file_all')return taskasync def __process_redis_tasks(self):while True:task_info = await self.__get_task_from_redis()if task_info is None:breaktask = json.loads(task_info)try:method = 'get'url = task["file_link"]file_path = './{}'.format(task["file_name"])headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'}if url.startswith('http') or url.startswith('https'):try:ddd = url.split('http')if len(ddd) != 0:url = 'http' + ddd[-1]except:continue# Perform download for the current taskawait self.__async_download_main(method, url, headers, file_path)except Exception as e:loguru.logger.error("Error processing Redis task:", e)async def __upload_to_minio(self, file_path, object_name):"""上传minio"""try:minioClient = Minio(**self.minio_config)check_bucket = minioClient.bucket_exists("******")if not check_bucket:minioClient.make_bucket("******")loguru.logger.info("start upload file to MinIO")minioClient.fput_object(bucket_name="******", object_name=object_name, file_path=file_path)loguru.logger.info("file {0} is successfully uploaded to MinIO".format(object_name))except FileNotFoundError as err:loguru.logger.info('*' * 10)loguru.logger.error('MinIO upload failed: ' + str(err))except S3Error as err:loguru.logger.error("MinIO upload failed:", err)async def __update_mongo_status(self, file_name, status):"""更新mongo采集状态"""try:mongo_uri = self.mongo_config["uri"]db_name = self.mongo_config["db"]collection_name = self.mongo_config["collection"]client = AsyncIOMotorClient(mongo_uri)db = client.get_database(db_name)collection = db.get_collection(collection_name)await collection.update_one({"file_name": file_name}, {"$set": {"status": status}})except Exception as e:loguru.logger.error("MongoDB update failed:", e)async def start(self):await self.__process_redis_tasks()loguru.logger.add("download_file_output.log", rotation="500 MB", level="DEBUG")
if __name__ == '__main__':as_dw = Async_download(20)asyncio.run(as_dw.start())
部分代码来源于y小白的笔记
相关文章:
python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)
异步切片下载二进制文件并上传桶删除本地文件 import json import os import asyncio from urllib import parseimport aiohttp import aioredis from motor.motor_asyncio import AsyncIOMotorClient from retrying import retry from minio import Minio from minio.error im…...
《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(10)-Fiddler如何设置捕获Firefox浏览器的Https会话
1.简介 经过上一篇对Fiddler的配置后,绝大多数的Https的会话,我们可以成功捕获抓取到,但是有些版本的Firefox浏览器仍然是捕获不到其的Https会话,需要我们更进一步的配置才能捕获到会话进行抓包。 2.宏哥环境 1.宏哥的环境是Wi…...
阿里云云原生弹性方案:用弹性解决集群资源利用率难题
作者:赫曦 随着上云的认知更加普遍,我们发现除了以往占大部分的互联网类型的客户,一些传统的企业,一些制造类的和工业型企业客户也都开始使用云原生的方式去做 IT 架构的转型,提高集群资源使用率也成为企业上云的一致…...
Spring-BeanPostProcessor PostConstruct init InitializingBean 执行顺序
执行顺序探究 新建一个对象用于测试 Component public class Student implements InitializingBean {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}pu…...
【算法】递归
递归 递归初始递归:数列求和递归的应用:任意进制转换递归深度限制递归可视化:分形树递归可视化:谢尔宾斯基Sierpinski三角形递归的应用:汉诺塔递归的应用:探索迷宫 分治策略和递归优化问题兑换最少个数硬币…...
DC-1靶机刷题记录
靶机下载地址: 链接:https://pan.baidu.com/s/1GX7qOamdNx01622EYUBSow?pwd9nyo 提取码:9nyo 参考答案: https://c3ting.com/archives/kai-qi-vulnhnbshua-tiDC-1.pdf【【基础向】超详解vulnhub靶场DC-1】 https://www.bilibi…...
rust跟我学七:获取外网IP地址
图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…...
华为:交换机忘记console密码重置
一、背景 许多旧项目经过长时间使用后,因为没有特定的管理运维人员,初始对接人也将初始账号密码等重要信息丢失,现需要进后台查看配置或更改网络配置,需重置密码 二、重置密码,不重置设备方法 1、使用console插入交…...
2024年甘肃省职业院校技能大赛信息安全管理与评估 样题三 模块一
竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...
Go 中 slice 的 In 功能实现探索
文章目录 遍历二分查找map key性能总结 之前在知乎看到一个问题:为什么 Golang 没有像 Python 中 in 一样的功能?于是,搜了下这个问题,发现还是有不少人有这样的疑问。 补充:本文写于 2019 年。GO 现在已经支持泛型&am…...
pyDAL一个python的ORM(终) pyDAL的一些性能优化
一、大批量插入数据 对于 大量数据插入时,虽然pyDAL也手册中有个方法:bulk_insert(),但是手册也说了,虽然方法上是一次可以多条数据,如果后端数据库是关系型数据库,他转换为SQL时它是一条一条的插入的&…...
springboot log4j配置xml实例说明
提供样本配置代码 xml <?xml version"1.0" encoding"UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!-- status log4j2内部日志级别 --> <configurat…...
VsCode重新安装需要配机的ESLint和 Prettier - Code formatter 配置
新电脑安装完Vscode后,需要装几个插件,这里记录下: {"diffEditor.ignoreTrimWhitespace": false,"files.autoSave": "afterDelay","editor.codeActionsOnSave": {"source.fixAll.eslint"…...
录屏功能怎么打开?简单操作,一学就会!
录屏功能在当今互联网时代变得越来越重要,无论是游戏录制、在线课程录制还是屏幕操作演示,录屏功能都为我们提供了便捷的解决方案。可是您知道录屏功能怎么打开吗?接下来,让我们一起探索如何在电脑上开启录屏功能,记录…...
小程序显示兼容处理,home键处理
定义: env(safe-area-inset-bottom)和env(safe-area-inset-top)是CSS中的变量,用于获取设备底部和顶部安全区域的大小 示例: padding-bottom: calc(env(safe-area-inset-bottom) 12px); /* 兼容iOS> 11.2 */安全间距类型: …...
【java八股文】之JVM基础篇
【java八股文】之JVM基础篇-CSDN博客 【java八股文】之MYSQL基础篇-CSDN博客 【java八股文】之Redis基础篇-CSDN博客 【java八股文】之Spring系列篇-CSDN博客 【java八股文】之分布式系列篇-CSDN博客 【java八股文】之多线程篇-CSDN博客 【java八股文】之JVM基础篇-CSDN博…...
2024美赛数学建模思路 - 案例:异常检测
文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…...
【EI会议征稿通知】2024年通信技术与软件工程国际学术会议 (CTSE 2024)
2024年通信技术与软件工程国际学术会议 (CTSE 2024) 2024 International Conference on Communication Technology and Software Engineering (CTSE 2024) 2024年通信技术与软件工程国际学术会议 (CTSE 2024)将于2024年03月15-17日在中国长沙举行。会议专注于通信技术与软件工…...
Js面试之作用域与闭包
Js面试之作用域与闭包 作用域词法作用域动态作用域 闭包闭包使用场景封装私有变量模块化开发保持变量状态异步操作 注意事项 最近在整理一些前端面试中经常被问到的问题,分为vue相关、react相关、js相关、react相关等等专题,可持续关注后续内容ÿ…...
Go 爬虫之 colly 从入门到不放弃指南
文章目录 概要介绍如何学习官方文档如何安装快速开始如何配置调试分布式代理层面执行层面存储层面存储多收集器配置优化持久化存储启用异步加快任务执行禁止或限制 KeepAlive 连接扩展总结如果想用 GO 实现爬虫能力,该如何做呢?抽时间研究了 Go 的一款爬虫框架 colly。 概要…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)
在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...
Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...
Matlab实现任意伪彩色图像可视化显示
Matlab实现任意伪彩色图像可视化显示 1、灰度原始图像2、RGB彩色原始图像 在科研研究中,如何展示好看的实验结果图像非常重要!!! 1、灰度原始图像 灰度图像每个像素点只有一个数值,代表该点的亮度(或…...
vue3 手动封装城市三级联动
要做的功能 示意图是这样的,因为后端给的数据结构 不足以使用ant-design组件 的联动查询组件 所以只能自己分装 组件 当然 这个数据后端给的不一样的情况下 可能组件内对应的 逻辑方式就不一样 毕竟是 三个 数组 省份 城市 区域 我直接粘贴组件代码了 <temp…...
JS设计模式(5): 发布订阅模式
解锁JavaScript发布订阅模式:让代码沟通更优雅 在JavaScript的世界里,我们常常会遇到这样的场景:多个模块之间需要相互通信,但是又不想让它们产生过于紧密的耦合。这时候,发布订阅模式就像一位优雅的信使,…...
