Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3, # 最大重试次数'RETRY_DELAY': 5, # 重试延迟(秒)'CONNECTION_TIMEOUT': 60 # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try: if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock() # 添加文件写入锁self.total_rows = 0 # 添加行数统计self.processed_batches = set() # 记录已成功处理的批次self.failed_batches = set() # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock: # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue() # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None: # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done() # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
- DatabaseManager
管理数据库连接的线程安全类
使用 threading.local() 确保每个线程有自己的连接
包含重试机制和连接管理功能
- DataProcessor
处理数据的核心类
定义了数据列和查询语句
处理数据批次的获取和保存
跟踪处理状态和失败的批次
2. 工作流程
- 初始化
创建空的输出文件
初始化线程池和任务队列
- 数据处理
将总数据量分成多个批次
多个工作线程并行处理数据批次
每个批次:
- 从数据库获取数据
- 转换为 DataFrame
- 保存到 CSV 文件
- 错误处理
失败的批次会进入重试队列
最多重试 3 次
记录所有失败的批次
- 数据验证
检查最终 CSV 文件的行数
检查和清理重复的表头
验证数据完整性
3. 特点
- 线程安全
使用线程本地存储管理数据库连接
文件写入使用锁保护
- 容错机制
数据库连接重试
批次处理重试
详细的日志记录
- 性能优化
批量处理数据
多线程并行处理
使用队列管理任务
- 监控和日志
详细的日志记录
处理进度跟踪
执行时间统计
这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。
相关文章:
Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd from clickhouse_driver import Client import timeit import logging import threading from threading import Lock from queue import Queue from typing import List, Dict, Set from contextlib import contextmanager import os import time# 配置参…...
如何找到系统中bert-base-uncased默认安装位置
问题: 服务器中无法连接huggingface,故需要自己将模型文件上传 ubuntu 可以按照这个链接下载 Bert下载和使用(以bert-base-uncased为例) - 会自愈的哈士奇 - 博客园 里面提供了giehub里面的链接 GitHub - google-research/be…...
在启动 Spring Boot 项目时,报找不到 slf4j 的错误
而且 tomcat 的启动信息不知道为什么输出出来了 问 AI 得到的解决方案: 将 pom.xml 中的如下配置替换成这样,排除这个插件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring - boot - starter - …...
android-12-source-code--write-file-function
find /app4/lineage19_oneplus6/system/ -name "*.cpp" -type f | while read -r k ; do ( grep -i write $k | grep -i file && echo $k ;) ; done获得android::base::WriteStringToFile, 进一步修改 find /app4/lineage19_oneplus6/system/ -name &qu…...
SQL(2)
一.时间盲注 有回显时用Union带出数据,只显示是否时可用布尔盲注得出数据,那如果没有任何输出时? 比如无论查询什么,都显示success,同一个回应,无法直接从服务器注入出任何数据,但是我们可以利…...
【IC每日一题:AMBA总线--APB协议时序及Verilog实现】
AMBA总线--APB协议时序及Verilog实现 1 APB3协议1.1 APB3时序1.1.1 APB写操作1.1.2 APB读操作 2 代码2.1 apb_master2.2 apb_slave 【博客首发于微信公众号《漫谈芯片与编程》,欢迎专注一下,多谢大家】 AMBA总线是用于连接微控制器和外围设备的总线协议&…...
抢先看!为什么很多公司会强行给员工电脑加屏幕水印?千字长文来解答
2024年度热议:为什么很多公司会强行给员工电脑加屏幕水印? 有人说:概是为了让员工时刻铭记,工作就像这水印,无处不在,想逃也逃不掉! “玩归玩,闹归闹”。 本文将对此进行详尽解答&…...
【AI技术】PaddleSpeech部署方案
【AI技术】PaddleSpeech部署方案 技术介绍优点缺点 部署基础环境的搭建分步详解国内镜像源切换所需环境1 g所需环境2 vim所需环境3 cuda所需环境4 cudnn所需环境5 ssl源码拉取PaddleSpeech环境安装 部署文件分享DockerHub 技术介绍 PaddleSpeech是飞浆平台的一款TTS框架。 优…...
可灵开始“独闯”,全面拥抱AI的快手能否尝到“甜头”?
现任谷歌CEO桑达尔皮查伊曾说到,“人工智能是我们人类正在从事的最为深刻的研究方向之一,甚至要比火与电还更加深刻。” 正如,Sora诞生时,在官方表述中被称为“世界模拟器”,它理解真实的规则,并在此基础上…...
qt QtConcurrent 详解
1、概述 QtConcurrent是Qt框架中用于简化多线程编程的一个模块,它提供了高层次的API来实现并行计算,而不需要开发者直接管理线程的创建、调度和销毁。QtConcurrent主要通过QFuture和QThreadPool来进行并发任务的执行,能够自动利用系统的所有…...
基于构件的软件开发、软件维护、区块链技术及湖仓一体架构的应用
目录 试题一 论基于构件的软件开发方法及其应用 试题二 论软件维护方法及其应用 试题三 论区块链技术及应用 试题四 论湖仓一体架构及其应用 相关推荐 试题一 论基于构件的软件开发方法及其应用 基于构件的软件开发(Component-Based Software Development,CBSD…...
【在Typora中绘制用户旅程图和甘特图】
在 Typora 中可以使用 Mermaid 绘制用户旅程图(User Journey Map),但由于 Mermaid 并不直接支持用户旅程图,我们可以通过一些图表的变通方式(比如流程图或甘特图)来表示用户旅程图的结构。用户旅程图通常展…...
【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(2)
快速跳转: 我的个人博客主页👉:Reuuse博客 新开专栏👉:Vue3专栏 参考文献👉:uniapp官网 ❀ 感谢支持!☀ 前情提要 🔺因为最近学习的vue语言,发现有很多细节…...
uniapp中使用全局样式文件引入的三种方式
如果你想在 uni-app 中全局引入 SCSS 文件(例如 global.scss),可以通过以下步骤进行配置: 方法一:在 main.js 中引入 在 main.js 中引入全局样式: 你可以在 src/main.js 文件中直接引入 SCSS 文件ÿ…...
计算机网络易混淆知识点串记
文章目录 计算机网络易混淆知识点串记各层PDU首部: 计算机网络易混淆知识点串记 各层PDU首部: PUD首部长度 (B:字节)首部单位数据链路–帧帧首:14B帧尾部:4B——IPV420~60字节4B [通过4位二进制表示]IPV6固定首部40字节[可拓展]4BTCP20~60字节4BUDP8B字节...
Java代码审计-模板注入漏洞
一、模板引擎 在Java开发当中,为了将前端和后端进行分离,降低项目代码的耦合性,使代码更加易于维护和管理。除去以上的原因,模板引擎还能实现动态和静态数据的分离。 二、主流模板引擎 在Java中,主流的模板引擎有:Fre…...
如何在Linux中使用Cron定时执行SQL任务
文章目录 前言一、方案分析二、使用步骤1.准备脚本2.crontab脚本执行 踩坑 前言 演示数据需要每天更新监控数据,不想手动执行,想到以下解决方案 navicat 创建定时任务java服务定时执行linux crontab 定时执行sql脚本 一、方案分析 我选择了第三个方案…...
数据集划分
1、 sklearn玩具数据集介绍 数据量小,数据在sklearn库的本地,只要安装了sklearn,不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大,数据只能通过网络获取(科学上网) 3 sklearn加载玩具数据集 示…...
带你读懂什么是AI Agent智能体
一、智能体的定义与特性 定义:智能体是一个使用大语言模型(LLM)来决定应用程序控制流的系统。然而,智能体的定义并不唯一,不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义,但更…...
react动态路由
在React应用中,动态路由(Dynamic Routing)通常指的是根据应用的状态或用户的交互来动态地显示或隐藏路由(页面或组件)。这可以通过多种方法实现,包括使用React Router库,它提供了强大的路由管理…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会
在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...
