Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3, # 最大重试次数'RETRY_DELAY': 5, # 重试延迟(秒)'CONNECTION_TIMEOUT': 60 # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try: if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock() # 添加文件写入锁self.total_rows = 0 # 添加行数统计self.processed_batches = set() # 记录已成功处理的批次self.failed_batches = set() # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock: # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue() # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None: # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done() # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
- DatabaseManager
管理数据库连接的线程安全类
使用 threading.local() 确保每个线程有自己的连接
包含重试机制和连接管理功能
- DataProcessor
处理数据的核心类
定义了数据列和查询语句
处理数据批次的获取和保存
跟踪处理状态和失败的批次
2. 工作流程
- 初始化
创建空的输出文件
初始化线程池和任务队列
- 数据处理
将总数据量分成多个批次
多个工作线程并行处理数据批次
每个批次:
- 从数据库获取数据
- 转换为 DataFrame
- 保存到 CSV 文件
- 错误处理
失败的批次会进入重试队列
最多重试 3 次
记录所有失败的批次
- 数据验证
检查最终 CSV 文件的行数
检查和清理重复的表头
验证数据完整性
3. 特点
- 线程安全
使用线程本地存储管理数据库连接
文件写入使用锁保护
- 容错机制
数据库连接重试
批次处理重试
详细的日志记录
- 性能优化
批量处理数据
多线程并行处理
使用队列管理任务
- 监控和日志
详细的日志记录
处理进度跟踪
执行时间统计
这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。
相关文章:
Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd from clickhouse_driver import Client import timeit import logging import threading from threading import Lock from queue import Queue from typing import List, Dict, Set from contextlib import contextmanager import os import time# 配置参…...
如何找到系统中bert-base-uncased默认安装位置
问题: 服务器中无法连接huggingface,故需要自己将模型文件上传 ubuntu 可以按照这个链接下载 Bert下载和使用(以bert-base-uncased为例) - 会自愈的哈士奇 - 博客园 里面提供了giehub里面的链接 GitHub - google-research/be…...
在启动 Spring Boot 项目时,报找不到 slf4j 的错误
而且 tomcat 的启动信息不知道为什么输出出来了 问 AI 得到的解决方案: 将 pom.xml 中的如下配置替换成这样,排除这个插件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring - boot - starter - …...
android-12-source-code--write-file-function
find /app4/lineage19_oneplus6/system/ -name "*.cpp" -type f | while read -r k ; do ( grep -i write $k | grep -i file && echo $k ;) ; done获得android::base::WriteStringToFile, 进一步修改 find /app4/lineage19_oneplus6/system/ -name &qu…...
SQL(2)
一.时间盲注 有回显时用Union带出数据,只显示是否时可用布尔盲注得出数据,那如果没有任何输出时? 比如无论查询什么,都显示success,同一个回应,无法直接从服务器注入出任何数据,但是我们可以利…...
【IC每日一题:AMBA总线--APB协议时序及Verilog实现】
AMBA总线--APB协议时序及Verilog实现 1 APB3协议1.1 APB3时序1.1.1 APB写操作1.1.2 APB读操作 2 代码2.1 apb_master2.2 apb_slave 【博客首发于微信公众号《漫谈芯片与编程》,欢迎专注一下,多谢大家】 AMBA总线是用于连接微控制器和外围设备的总线协议&…...
抢先看!为什么很多公司会强行给员工电脑加屏幕水印?千字长文来解答
2024年度热议:为什么很多公司会强行给员工电脑加屏幕水印? 有人说:概是为了让员工时刻铭记,工作就像这水印,无处不在,想逃也逃不掉! “玩归玩,闹归闹”。 本文将对此进行详尽解答&…...
【AI技术】PaddleSpeech部署方案
【AI技术】PaddleSpeech部署方案 技术介绍优点缺点 部署基础环境的搭建分步详解国内镜像源切换所需环境1 g所需环境2 vim所需环境3 cuda所需环境4 cudnn所需环境5 ssl源码拉取PaddleSpeech环境安装 部署文件分享DockerHub 技术介绍 PaddleSpeech是飞浆平台的一款TTS框架。 优…...
可灵开始“独闯”,全面拥抱AI的快手能否尝到“甜头”?
现任谷歌CEO桑达尔皮查伊曾说到,“人工智能是我们人类正在从事的最为深刻的研究方向之一,甚至要比火与电还更加深刻。” 正如,Sora诞生时,在官方表述中被称为“世界模拟器”,它理解真实的规则,并在此基础上…...
qt QtConcurrent 详解
1、概述 QtConcurrent是Qt框架中用于简化多线程编程的一个模块,它提供了高层次的API来实现并行计算,而不需要开发者直接管理线程的创建、调度和销毁。QtConcurrent主要通过QFuture和QThreadPool来进行并发任务的执行,能够自动利用系统的所有…...
基于构件的软件开发、软件维护、区块链技术及湖仓一体架构的应用
目录 试题一 论基于构件的软件开发方法及其应用 试题二 论软件维护方法及其应用 试题三 论区块链技术及应用 试题四 论湖仓一体架构及其应用 相关推荐 试题一 论基于构件的软件开发方法及其应用 基于构件的软件开发(Component-Based Software Development,CBSD…...
【在Typora中绘制用户旅程图和甘特图】
在 Typora 中可以使用 Mermaid 绘制用户旅程图(User Journey Map),但由于 Mermaid 并不直接支持用户旅程图,我们可以通过一些图表的变通方式(比如流程图或甘特图)来表示用户旅程图的结构。用户旅程图通常展…...
【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(2)
快速跳转: 我的个人博客主页👉:Reuuse博客 新开专栏👉:Vue3专栏 参考文献👉:uniapp官网 ❀ 感谢支持!☀ 前情提要 🔺因为最近学习的vue语言,发现有很多细节…...
uniapp中使用全局样式文件引入的三种方式
如果你想在 uni-app 中全局引入 SCSS 文件(例如 global.scss),可以通过以下步骤进行配置: 方法一:在 main.js 中引入 在 main.js 中引入全局样式: 你可以在 src/main.js 文件中直接引入 SCSS 文件ÿ…...
计算机网络易混淆知识点串记
文章目录 计算机网络易混淆知识点串记各层PDU首部: 计算机网络易混淆知识点串记 各层PDU首部: PUD首部长度 (B:字节)首部单位数据链路–帧帧首:14B帧尾部:4B——IPV420~60字节4B [通过4位二进制表示]IPV6固定首部40字节[可拓展]4BTCP20~60字节4BUDP8B字节...
Java代码审计-模板注入漏洞
一、模板引擎 在Java开发当中,为了将前端和后端进行分离,降低项目代码的耦合性,使代码更加易于维护和管理。除去以上的原因,模板引擎还能实现动态和静态数据的分离。 二、主流模板引擎 在Java中,主流的模板引擎有:Fre…...
如何在Linux中使用Cron定时执行SQL任务
文章目录 前言一、方案分析二、使用步骤1.准备脚本2.crontab脚本执行 踩坑 前言 演示数据需要每天更新监控数据,不想手动执行,想到以下解决方案 navicat 创建定时任务java服务定时执行linux crontab 定时执行sql脚本 一、方案分析 我选择了第三个方案…...
数据集划分
1、 sklearn玩具数据集介绍 数据量小,数据在sklearn库的本地,只要安装了sklearn,不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大,数据只能通过网络获取(科学上网) 3 sklearn加载玩具数据集 示…...
带你读懂什么是AI Agent智能体
一、智能体的定义与特性 定义:智能体是一个使用大语言模型(LLM)来决定应用程序控制流的系统。然而,智能体的定义并不唯一,不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义,但更…...
react动态路由
在React应用中,动态路由(Dynamic Routing)通常指的是根据应用的状态或用户的交互来动态地显示或隐藏路由(页面或组件)。这可以通过多种方法实现,包括使用React Router库,它提供了强大的路由管理…...
Liquibase,数据库无关的版本控制工具!
在现代软件开发中,数据库的版本控制往往比代码版本控制更具挑战性。不同的开发环境、测试环境、生产环境可能使用不同的数据库产品(如开发用H2、测试用MySQL、生产用PostgreSQL),而传统的SQL脚本往往包含特定数据库的方言…...
别再乱选格式了!LVGL图片转换工具(lv_img_conv)保姆级使用指南,从BMP到C数组一次搞定
LVGL图像转换实战指南:从格式选择到批量处理的完整解决方案 在嵌入式UI开发中,图像资源处理往往是第一个技术门槛。许多开发者在使用LVGL时,80%的初期问题都集中在图像转换环节——为什么转换后的图片显示异常?如何平衡内存占用和…...
Windows右键菜单终极清理指南:3步让你的右键菜单重获新生
Windows右键菜单终极清理指南:3步让你的右键菜单重获新生 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 还在为每次右键点击文件时弹出的杂乱菜单而…...
Obsidian插件翻译终极指南:5分钟让所有插件说你的母语
Obsidian插件翻译终极指南:5分钟让所有插件说你的母语 【免费下载链接】obsidian-i18n 项目地址: https://gitcode.com/gh_mirrors/ob/obsidian-i18n 你是否曾经因为喜欢的Obsidian插件只有英文界面而感到困扰?或者因为语言障碍而无法充分发挥插…...
别再手动另存为了!用Python脚本5分钟搞定上百个Excel文件的格式转换(附完整代码)
别再手动另存为了!用Python脚本5分钟搞定上百个Excel文件的格式转换(附完整代码) 你是否曾经面对过这样的场景:电脑里堆积着上百个老旧的.xls格式Excel文件,每次需要使用时都得手动一个个"另存为"xlsx格式&a…...
智能客服VS语音转写:不同场景下语音识别评估指标的选择指南
智能客服与语音转写:业务场景驱动的语音识别评估指标决策框架 当企业考虑部署语音识别系统时,技术团队常会抛出一堆专业术语:WER 15%、CER 8%、SER 22%...但对产品经理和解决方案架构师而言,这些数字背后意味着什么?选…...
QML与QWidget混合开发:实现高效UI集成的实战指南
1. 为什么需要QML与QWidget混合开发 在Qt开发中,QML和QWidget是两种完全不同的UI构建方式。QML凭借其声明式语法和强大的动画效果,在现代UI开发中越来越受欢迎。但现实情况是,很多成熟的功能模块都是基于QWidget开发的,比如一些第…...
SecGPT-14B提示工程:提升OpenClaw安全报告可读性的秘诀
SecGPT-14B提示工程:提升OpenClaw安全报告可读性的秘诀 1. 当安全报告遇上OpenClaw:我的真实痛点 上周五凌晨2点,我被OpenClaw的告警邮件惊醒——它发现我的个人服务器存在一个高危漏洞。但当我打开那份自动生成的安全报告时,眼…...
GIMP Resynthesizer完整教程:掌握纹理合成与图像修复的核心技术
GIMP Resynthesizer完整教程:掌握纹理合成与图像修复的核心技术 【免费下载链接】resynthesizer Suite of gimp plugins for texture synthesis 项目地址: https://gitcode.com/gh_mirrors/re/resynthesizer 当你面对一张需要修复的老照片,或者需…...
Fay-UE5数字人系统架构深度解析:基于Unreal Engine 5的实时交互虚拟人技术实现
Fay-UE5数字人系统架构深度解析:基于Unreal Engine 5的实时交互虚拟人技术实现 【免费下载链接】fay-ue5 项目地址: https://gitcode.com/gh_mirrors/fa/fay-ue5 Fay-UE5是一个基于Unreal Engine 5引擎构建的高性能数字人解决方案,通过深度集成F…...
