当前位置: 首页 > news >正文

python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)

异步切片下载二进制文件并上传桶删除本地文件

import json
import os
import asyncio
from urllib import parseimport aiohttp
import aioredis
from motor.motor_asyncio import AsyncIOMotorClient
from retrying import retry
from minio import Minio
from minio.error import S3Error
import loguruclass Async_download:def __init__(self, sema_number=20, redis_address='redis://ip:6379/db', redis_password='passwd'):self.__sema_number = sema_numberself.__sema = Noneself.redis_address = redis_addressself.redis_password = redis_passwordself.minio_config = {"endpoint": "ip:port","access_key": "******","secret_key": "******","secure": False}self.mongo_config = {"uri": "mongodb://{0}:{1}@ip:27017".format(parse.quote_plus("user"),parse.quote_plus("passwd")),"db": "******","collection": "******",}@classmethoddef _mkdir(cls, file_path, full_path):if os.path.exists(file_path):return Truetry:os.makedirs(full_path)except Exception as e:passreturn Falsedef __init_check(self, file_path: str):full_path, file_name = file_path.rsplit('/', 1)file_size = os.path.getsize(file_path) if self._mkdir(file_path, full_path) else 0return file_name, file_size@classmethoddef __sync_save_local(cls, r_headers, results, file_path):done, padding = resultsfor d in done:for index, value in d.result().items():r_headers[index] = valuewith open(file_path, 'ab') as f:for _, value in r_headers.items():f.write(value)return True@classmethoddef __generate_headers(cls, headers, file_size, first_byte):r_headers = {}index = 0if first_byte > 51200000:byte = 2048000  # 2M 为一片else:byte = 1024000  # 1M 为一片while True:file_size_two = file_size + byteif file_size_two >= first_byte:r_headers[index] = {"Range": f"bytes={file_size}-{first_byte}"}breakr_headers[index] = {"Range": f"bytes={file_size}-{file_size_two - 1}"}index += 1file_size = file_size_twofor key in r_headers:r_headers[key].update(headers)return r_headers@retry(stop_max_attempt_number=3)async def __download_one(self, session, method, url, r_headers, **kwargs):index, headers = r_headersasync with self.__sema:async with session.request(method, url, headers=headers, **kwargs) as response:binary = await response.content.read()return {index: binary}async def __async_section_download(self, session, method, url, r_headers, **kwargs):tasks = [asyncio.create_task(self.__download_one(session, method, url, (key, r_headers[key]), **kwargs)) for key inr_headers]return await asyncio.wait(tasks)@classmethodasync def __get_content_length(cls, session, method, url, headers, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:return response.headers.get('Content-Length') or response.headers.get('content-length') or 0@classmethodasync def __sync_download(cls, session, method, url, headers, file_path, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:with open(file_path, 'wb') as f:binary = await response.content.read()f.write(binary)async def __async_download_main(self, method, url, headers, file_path, **kwargs):file_name, file_size = self.__init_check(file_path)self.__sema = asyncio.Semaphore(self.__sema_number)async with aiohttp.ClientSession() as session:content_length = await self.__get_content_length(session, method, url, headers, **kwargs)if content_length and content_length.isdigit():content_length = int(content_length)if file_size >= content_length:await self.__upload_to_minio(file_path, file_name)  # Upload to MinIOawait self.__update_mongo_status(file_name, True)  # Update MongoDB statusos.remove(file_path)  # Delete local filereturn True, file_pathr_headers = self.__generate_headers(headers, file_size, content_length)results = await self.__async_section_download(session, method, url, r_headers, **kwargs)self.__sync_save_local(r_headers, results, file_path)else:await self.__sync_download(session, method, url, headers, file_path, **kwargs)if os.path.getsize(file_path) >= int(content_length):await self.__upload_to_minio(file_path, file_name)  # Upload to MinIOawait self.__update_mongo_status(file_name, True)  # Update MongoDB statusos.remove(file_path)  # Delete local filereturn True, file_pathreturn False, file_pathasync def __get_task_from_redis(self):async with aioredis.from_url(self.redis_address, password=self.redis_password) as redis:task = await redis.lpop('file_file_all')return taskasync def __process_redis_tasks(self):while True:task_info = await self.__get_task_from_redis()if task_info is None:breaktask = json.loads(task_info)try:method = 'get'url = task["file_link"]file_path = './{}'.format(task["file_name"])headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'}if url.startswith('http') or url.startswith('https'):try:ddd = url.split('http')if len(ddd) != 0:url = 'http' + ddd[-1]except:continue# Perform download for the current taskawait self.__async_download_main(method, url, headers, file_path)except Exception as e:loguru.logger.error("Error processing Redis task:", e)async def __upload_to_minio(self, file_path, object_name):"""上传minio"""try:minioClient = Minio(**self.minio_config)check_bucket = minioClient.bucket_exists("******")if not check_bucket:minioClient.make_bucket("******")loguru.logger.info("start upload file to MinIO")minioClient.fput_object(bucket_name="******", object_name=object_name, file_path=file_path)loguru.logger.info("file {0} is successfully uploaded to MinIO".format(object_name))except FileNotFoundError as err:loguru.logger.info('*' * 10)loguru.logger.error('MinIO upload failed: ' + str(err))except S3Error as err:loguru.logger.error("MinIO upload failed:", err)async def __update_mongo_status(self, file_name, status):"""更新mongo采集状态"""try:mongo_uri = self.mongo_config["uri"]db_name = self.mongo_config["db"]collection_name = self.mongo_config["collection"]client = AsyncIOMotorClient(mongo_uri)db = client.get_database(db_name)collection = db.get_collection(collection_name)await collection.update_one({"file_name": file_name}, {"$set": {"status": status}})except Exception as e:loguru.logger.error("MongoDB update failed:", e)async def start(self):await self.__process_redis_tasks()loguru.logger.add("download_file_output.log", rotation="500 MB", level="DEBUG")
if __name__ == '__main__':as_dw = Async_download(20)asyncio.run(as_dw.start())

部分代码来源于y小白的笔记

相关文章:

python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)

异步切片下载二进制文件并上传桶删除本地文件 import json import os import asyncio from urllib import parseimport aiohttp import aioredis from motor.motor_asyncio import AsyncIOMotorClient from retrying import retry from minio import Minio from minio.error im…...

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(10)-Fiddler如何设置捕获Firefox浏览器的Https会话

1.简介 经过上一篇对Fiddler的配置后,绝大多数的Https的会话,我们可以成功捕获抓取到,但是有些版本的Firefox浏览器仍然是捕获不到其的Https会话,需要我们更进一步的配置才能捕获到会话进行抓包。 2.宏哥环境 1.宏哥的环境是Wi…...

阿里云云原生弹性方案:用弹性解决集群资源利用率难题

作者:赫曦 随着上云的认知更加普遍,我们发现除了以往占大部分的互联网类型的客户,一些传统的企业,一些制造类的和工业型企业客户也都开始使用云原生的方式去做 IT 架构的转型,提高集群资源使用率也成为企业上云的一致…...

Spring-BeanPostProcessor PostConstruct init InitializingBean 执行顺序

执行顺序探究 新建一个对象用于测试 Component public class Student implements InitializingBean {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}pu…...

【算法】递归

递归 递归初始递归:数列求和递归的应用:任意进制转换递归深度限制递归可视化:分形树递归可视化:谢尔宾斯基Sierpinski三角形递归的应用:汉诺塔递归的应用:探索迷宫 分治策略和递归优化问题兑换最少个数硬币…...

DC-1靶机刷题记录

靶机下载地址: 链接:https://pan.baidu.com/s/1GX7qOamdNx01622EYUBSow?pwd9nyo 提取码:9nyo 参考答案: https://c3ting.com/archives/kai-qi-vulnhnbshua-tiDC-1.pdf【【基础向】超详解vulnhub靶场DC-1】 https://www.bilibi…...

rust跟我学七:获取外网IP地址

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎么获取到本机的外网IP地址。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[…...

华为:交换机忘记console密码重置

一、背景 许多旧项目经过长时间使用后,因为没有特定的管理运维人员,初始对接人也将初始账号密码等重要信息丢失,现需要进后台查看配置或更改网络配置,需重置密码 二、重置密码,不重置设备方法 1、使用console插入交…...

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题三 模块一

竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...

Go 中 slice 的 In 功能实现探索

文章目录 遍历二分查找map key性能总结 之前在知乎看到一个问题:为什么 Golang 没有像 Python 中 in 一样的功能?于是,搜了下这个问题,发现还是有不少人有这样的疑问。 补充:本文写于 2019 年。GO 现在已经支持泛型&am…...

pyDAL一个python的ORM(终) pyDAL的一些性能优化

一、大批量插入数据 对于 大量数据插入时,虽然pyDAL也手册中有个方法:bulk_insert(),但是手册也说了,虽然方法上是一次可以多条数据,如果后端数据库是关系型数据库,他转换为SQL时它是一条一条的插入的&…...

springboot log4j配置xml实例说明

提供样本配置代码 xml <?xml version"1.0" encoding"UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!-- status log4j2内部日志级别 --> <configurat…...

VsCode重新安装需要配机的ESLint和 Prettier - Code formatter 配置

新电脑安装完Vscode后&#xff0c;需要装几个插件&#xff0c;这里记录下&#xff1a; {"diffEditor.ignoreTrimWhitespace": false,"files.autoSave": "afterDelay","editor.codeActionsOnSave": {"source.fixAll.eslint"…...

录屏功能怎么打开?简单操作,一学就会!

录屏功能在当今互联网时代变得越来越重要&#xff0c;无论是游戏录制、在线课程录制还是屏幕操作演示&#xff0c;录屏功能都为我们提供了便捷的解决方案。可是您知道录屏功能怎么打开吗&#xff1f;接下来&#xff0c;让我们一起探索如何在电脑上开启录屏功能&#xff0c;记录…...

小程序显示兼容处理,home键处理

定义&#xff1a; env(safe-area-inset-bottom)和env(safe-area-inset-top)是CSS中的变量&#xff0c;用于获取设备底部和顶部安全区域的大小 示例&#xff1a; padding-bottom: calc(env(safe-area-inset-bottom) 12px); /* 兼容iOS> 11.2 */安全间距类型&#xff1a; …...

【java八股文】之JVM基础篇

【java八股文】之JVM基础篇-CSDN博客 【java八股文】之MYSQL基础篇-CSDN博客 【java八股文】之Redis基础篇-CSDN博客 【java八股文】之Spring系列篇-CSDN博客 【java八股文】之分布式系列篇-CSDN博客 【java八股文】之多线程篇-CSDN博客 【java八股文】之JVM基础篇-CSDN博…...

2024美赛数学建模思路 - 案例:异常检测

文章目录 赛题思路一、简介 -- 关于异常检测异常检测监督学习 二、异常检测算法2. 箱线图分析3. 基于距离/密度4. 基于划分思想 建模资料 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 一、简介 – 关于异常…...

【EI会议征稿通知】2024年通信技术与软件工程国际学术会议 (CTSE 2024)

2024年通信技术与软件工程国际学术会议 (CTSE 2024) 2024 International Conference on Communication Technology and Software Engineering (CTSE 2024) 2024年通信技术与软件工程国际学术会议 (CTSE 2024)将于2024年03月15-17日在中国长沙举行。会议专注于通信技术与软件工…...

Js面试之作用域与闭包

Js面试之作用域与闭包 作用域词法作用域动态作用域 闭包闭包使用场景封装私有变量模块化开发保持变量状态异步操作 注意事项 最近在整理一些前端面试中经常被问到的问题&#xff0c;分为vue相关、react相关、js相关、react相关等等专题&#xff0c;可持续关注后续内容&#xff…...

Go 爬虫之 colly 从入门到不放弃指南

文章目录 概要介绍如何学习官方文档如何安装快速开始如何配置调试分布式代理层面执行层面存储层面存储多收集器配置优化持久化存储启用异步加快任务执行禁止或限制 KeepAlive 连接扩展总结如果想用 GO 实现爬虫能力,该如何做呢?抽时间研究了 Go 的一款爬虫框架 colly。 概要…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...