Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3, # 最大重试次数'RETRY_DELAY': 5, # 重试延迟(秒)'CONNECTION_TIMEOUT': 60 # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try: if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock() # 添加文件写入锁self.total_rows = 0 # 添加行数统计self.processed_batches = set() # 记录已成功处理的批次self.failed_batches = set() # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock: # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue() # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None: # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done() # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
- DatabaseManager
管理数据库连接的线程安全类
使用 threading.local() 确保每个线程有自己的连接
包含重试机制和连接管理功能
- DataProcessor
处理数据的核心类
定义了数据列和查询语句
处理数据批次的获取和保存
跟踪处理状态和失败的批次
2. 工作流程
- 初始化
创建空的输出文件
初始化线程池和任务队列
- 数据处理
将总数据量分成多个批次
多个工作线程并行处理数据批次
每个批次:
- 从数据库获取数据
- 转换为 DataFrame
- 保存到 CSV 文件
- 错误处理
失败的批次会进入重试队列
最多重试 3 次
记录所有失败的批次
- 数据验证
检查最终 CSV 文件的行数
检查和清理重复的表头
验证数据完整性
3. 特点
- 线程安全
使用线程本地存储管理数据库连接
文件写入使用锁保护
- 容错机制
数据库连接重试
批次处理重试
详细的日志记录
- 性能优化
批量处理数据
多线程并行处理
使用队列管理任务
- 监控和日志
详细的日志记录
处理进度跟踪
执行时间统计
这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。
相关文章:
Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd from clickhouse_driver import Client import timeit import logging import threading from threading import Lock from queue import Queue from typing import List, Dict, Set from contextlib import contextmanager import os import time# 配置参…...
如何找到系统中bert-base-uncased默认安装位置
问题: 服务器中无法连接huggingface,故需要自己将模型文件上传 ubuntu 可以按照这个链接下载 Bert下载和使用(以bert-base-uncased为例) - 会自愈的哈士奇 - 博客园 里面提供了giehub里面的链接 GitHub - google-research/be…...
在启动 Spring Boot 项目时,报找不到 slf4j 的错误
而且 tomcat 的启动信息不知道为什么输出出来了 问 AI 得到的解决方案: 将 pom.xml 中的如下配置替换成这样,排除这个插件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring - boot - starter - …...
android-12-source-code--write-file-function
find /app4/lineage19_oneplus6/system/ -name "*.cpp" -type f | while read -r k ; do ( grep -i write $k | grep -i file && echo $k ;) ; done获得android::base::WriteStringToFile, 进一步修改 find /app4/lineage19_oneplus6/system/ -name &qu…...
SQL(2)
一.时间盲注 有回显时用Union带出数据,只显示是否时可用布尔盲注得出数据,那如果没有任何输出时? 比如无论查询什么,都显示success,同一个回应,无法直接从服务器注入出任何数据,但是我们可以利…...
【IC每日一题:AMBA总线--APB协议时序及Verilog实现】
AMBA总线--APB协议时序及Verilog实现 1 APB3协议1.1 APB3时序1.1.1 APB写操作1.1.2 APB读操作 2 代码2.1 apb_master2.2 apb_slave 【博客首发于微信公众号《漫谈芯片与编程》,欢迎专注一下,多谢大家】 AMBA总线是用于连接微控制器和外围设备的总线协议&…...
抢先看!为什么很多公司会强行给员工电脑加屏幕水印?千字长文来解答
2024年度热议:为什么很多公司会强行给员工电脑加屏幕水印? 有人说:概是为了让员工时刻铭记,工作就像这水印,无处不在,想逃也逃不掉! “玩归玩,闹归闹”。 本文将对此进行详尽解答&…...
【AI技术】PaddleSpeech部署方案
【AI技术】PaddleSpeech部署方案 技术介绍优点缺点 部署基础环境的搭建分步详解国内镜像源切换所需环境1 g所需环境2 vim所需环境3 cuda所需环境4 cudnn所需环境5 ssl源码拉取PaddleSpeech环境安装 部署文件分享DockerHub 技术介绍 PaddleSpeech是飞浆平台的一款TTS框架。 优…...
可灵开始“独闯”,全面拥抱AI的快手能否尝到“甜头”?
现任谷歌CEO桑达尔皮查伊曾说到,“人工智能是我们人类正在从事的最为深刻的研究方向之一,甚至要比火与电还更加深刻。” 正如,Sora诞生时,在官方表述中被称为“世界模拟器”,它理解真实的规则,并在此基础上…...
qt QtConcurrent 详解
1、概述 QtConcurrent是Qt框架中用于简化多线程编程的一个模块,它提供了高层次的API来实现并行计算,而不需要开发者直接管理线程的创建、调度和销毁。QtConcurrent主要通过QFuture和QThreadPool来进行并发任务的执行,能够自动利用系统的所有…...
基于构件的软件开发、软件维护、区块链技术及湖仓一体架构的应用
目录 试题一 论基于构件的软件开发方法及其应用 试题二 论软件维护方法及其应用 试题三 论区块链技术及应用 试题四 论湖仓一体架构及其应用 相关推荐 试题一 论基于构件的软件开发方法及其应用 基于构件的软件开发(Component-Based Software Development,CBSD…...
【在Typora中绘制用户旅程图和甘特图】
在 Typora 中可以使用 Mermaid 绘制用户旅程图(User Journey Map),但由于 Mermaid 并不直接支持用户旅程图,我们可以通过一些图表的变通方式(比如流程图或甘特图)来表示用户旅程图的结构。用户旅程图通常展…...
【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(2)
快速跳转: 我的个人博客主页👉:Reuuse博客 新开专栏👉:Vue3专栏 参考文献👉:uniapp官网 ❀ 感谢支持!☀ 前情提要 🔺因为最近学习的vue语言,发现有很多细节…...
uniapp中使用全局样式文件引入的三种方式
如果你想在 uni-app 中全局引入 SCSS 文件(例如 global.scss),可以通过以下步骤进行配置: 方法一:在 main.js 中引入 在 main.js 中引入全局样式: 你可以在 src/main.js 文件中直接引入 SCSS 文件ÿ…...
计算机网络易混淆知识点串记
文章目录 计算机网络易混淆知识点串记各层PDU首部: 计算机网络易混淆知识点串记 各层PDU首部: PUD首部长度 (B:字节)首部单位数据链路–帧帧首:14B帧尾部:4B——IPV420~60字节4B [通过4位二进制表示]IPV6固定首部40字节[可拓展]4BTCP20~60字节4BUDP8B字节...
Java代码审计-模板注入漏洞
一、模板引擎 在Java开发当中,为了将前端和后端进行分离,降低项目代码的耦合性,使代码更加易于维护和管理。除去以上的原因,模板引擎还能实现动态和静态数据的分离。 二、主流模板引擎 在Java中,主流的模板引擎有:Fre…...
如何在Linux中使用Cron定时执行SQL任务
文章目录 前言一、方案分析二、使用步骤1.准备脚本2.crontab脚本执行 踩坑 前言 演示数据需要每天更新监控数据,不想手动执行,想到以下解决方案 navicat 创建定时任务java服务定时执行linux crontab 定时执行sql脚本 一、方案分析 我选择了第三个方案…...
数据集划分
1、 sklearn玩具数据集介绍 数据量小,数据在sklearn库的本地,只要安装了sklearn,不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大,数据只能通过网络获取(科学上网) 3 sklearn加载玩具数据集 示…...
带你读懂什么是AI Agent智能体
一、智能体的定义与特性 定义:智能体是一个使用大语言模型(LLM)来决定应用程序控制流的系统。然而,智能体的定义并不唯一,不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义,但更…...
react动态路由
在React应用中,动态路由(Dynamic Routing)通常指的是根据应用的状态或用户的交互来动态地显示或隐藏路由(页面或组件)。这可以通过多种方法实现,包括使用React Router库,它提供了强大的路由管理…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
