当前位置: 首页 > news >正文

Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3,           # 最大重试次数'RETRY_DELAY': 5,           # 重试延迟(秒)'CONNECTION_TIMEOUT': 60    # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:    if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock()  # 添加文件写入锁self.total_rows = 0      # 添加行数统计self.processed_batches = set()  # 记录已成功处理的批次self.failed_batches = set()     # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock:  # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()  # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None:  # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done()  # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
  • DatabaseManager

管理数据库连接的线程安全类

使用 threading.local() 确保每个线程有自己的连接

包含重试机制和连接管理功能

  • DataProcessor

处理数据的核心类

定义了数据列和查询语句

处理数据批次的获取和保存

跟踪处理状态和失败的批次

2. 工作流程

  • 初始化

创建空的输出文件

初始化线程池和任务队列

  • 数据处理

将总数据量分成多个批次

多个工作线程并行处理数据批次

每个批次:

  • 从数据库获取数据
  • 转换为 DataFrame
  • 保存到 CSV 文件
  • 错误处理

失败的批次会进入重试队列

最多重试 3 次

记录所有失败的批次

  • 数据验证

检查最终 CSV 文件的行数

检查和清理重复的表头

验证数据完整性

3. 特点

  • 线程安全

使用线程本地存储管理数据库连接

文件写入使用锁保护

  • 容错机制

数据库连接重试

批次处理重试

详细的日志记录

  • 性能优化

批量处理数据

多线程并行处理

使用队列管理任务

  • 监控和日志

详细的日志记录

处理进度跟踪

执行时间统计

这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。

相关文章:

Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd from clickhouse_driver import Client import timeit import logging import threading from threading import Lock from queue import Queue from typing import List, Dict, Set from contextlib import contextmanager import os import time# 配置参…...

如何找到系统中bert-base-uncased默认安装位置

问题&#xff1a; 服务器中无法连接huggingface&#xff0c;故需要自己将模型文件上传 ubuntu 可以按照这个链接下载 Bert下载和使用&#xff08;以bert-base-uncased为例&#xff09; - 会自愈的哈士奇 - 博客园 里面提供了giehub里面的链接 GitHub - google-research/be…...

在启动 Spring Boot 项目时,报找不到 slf4j 的错误

而且 tomcat 的启动信息不知道为什么输出出来了 问 AI 得到的解决方案&#xff1a; 将 pom.xml 中的如下配置替换成这样&#xff0c;排除这个插件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring - boot - starter - …...

android-12-source-code--write-file-function

find /app4/lineage19_oneplus6/system/ -name "*.cpp" -type f | while read -r k ; do ( grep -i write $k | grep -i file && echo $k ;) ; done获得android::base::WriteStringToFile, 进一步修改 find /app4/lineage19_oneplus6/system/ -name &qu…...

SQL(2)

一.时间盲注 有回显时用Union带出数据&#xff0c;只显示是否时可用布尔盲注得出数据&#xff0c;那如果没有任何输出时&#xff1f; 比如无论查询什么&#xff0c;都显示success&#xff0c;同一个回应&#xff0c;无法直接从服务器注入出任何数据&#xff0c;但是我们可以利…...

【IC每日一题:AMBA总线--APB协议时序及Verilog实现】

AMBA总线--APB协议时序及Verilog实现 1 APB3协议1.1 APB3时序1.1.1 APB写操作1.1.2 APB读操作 2 代码2.1 apb_master2.2 apb_slave 【博客首发于微信公众号《漫谈芯片与编程》&#xff0c;欢迎专注一下&#xff0c;多谢大家】 AMBA总线是用于连接微控制器和外围设备的总线协议&…...

抢先看!为什么很多公司会强行给员工电脑加屏幕水印?千字长文来解答

2024年度热议&#xff1a;为什么很多公司会强行给员工电脑加屏幕水印&#xff1f; 有人说&#xff1a;概是为了让员工时刻铭记&#xff0c;工作就像这水印&#xff0c;无处不在&#xff0c;想逃也逃不掉&#xff01; “玩归玩&#xff0c;闹归闹”。 本文将对此进行详尽解答&…...

【AI技术】PaddleSpeech部署方案

【AI技术】PaddleSpeech部署方案 技术介绍优点缺点 部署基础环境的搭建分步详解国内镜像源切换所需环境1 g所需环境2 vim所需环境3 cuda所需环境4 cudnn所需环境5 ssl源码拉取PaddleSpeech环境安装 部署文件分享DockerHub 技术介绍 PaddleSpeech是飞浆平台的一款TTS框架。 优…...

可灵开始“独闯”,全面拥抱AI的快手能否尝到“甜头”?

现任谷歌CEO桑达尔皮查伊曾说到&#xff0c;“人工智能是我们人类正在从事的最为深刻的研究方向之一&#xff0c;甚至要比火与电还更加深刻。” 正如&#xff0c;Sora诞生时&#xff0c;在官方表述中被称为“世界模拟器”&#xff0c;它理解真实的规则&#xff0c;并在此基础上…...

qt QtConcurrent 详解

1、概述 QtConcurrent是Qt框架中用于简化多线程编程的一个模块&#xff0c;它提供了高层次的API来实现并行计算&#xff0c;而不需要开发者直接管理线程的创建、调度和销毁。QtConcurrent主要通过QFuture和QThreadPool来进行并发任务的执行&#xff0c;能够自动利用系统的所有…...

基于构件的软件开发、软件维护、区块链技术及湖仓一体架构的应用

目录 试题一 论基于构件的软件开发方法及其应用 试题二 论软件维护方法及其应用 试题三 论区块链技术及应用 试题四 论湖仓一体架构及其应用 相关推荐 试题一 论基于构件的软件开发方法及其应用 基于构件的软件开发(Component-Based Software Development&#xff0c;CBSD…...

【在Typora中绘制用户旅程图和甘特图】

在 Typora 中可以使用 Mermaid 绘制用户旅程图&#xff08;User Journey Map&#xff09;&#xff0c;但由于 Mermaid 并不直接支持用户旅程图&#xff0c;我们可以通过一些图表的变通方式&#xff08;比如流程图或甘特图&#xff09;来表示用户旅程图的结构。用户旅程图通常展…...

【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(2)

快速跳转&#xff1a; 我的个人博客主页&#x1f449;&#xff1a;Reuuse博客 新开专栏&#x1f449;&#xff1a;Vue3专栏 参考文献&#x1f449;&#xff1a;uniapp官网 ❀ 感谢支持&#xff01;☀ 前情提要 &#x1f53a;因为最近学习的vue语言&#xff0c;发现有很多细节…...

uniapp中使用全局样式文件引入的三种方式

如果你想在 uni-app 中全局引入 SCSS 文件&#xff08;例如 global.scss&#xff09;&#xff0c;可以通过以下步骤进行配置&#xff1a; 方法一&#xff1a;在 main.js 中引入 在 main.js 中引入全局样式&#xff1a; 你可以在 src/main.js 文件中直接引入 SCSS 文件&#xff…...

计算机网络易混淆知识点串记

文章目录 计算机网络易混淆知识点串记各层PDU首部: 计算机网络易混淆知识点串记 各层PDU首部: PUD首部长度 (B:字节)首部单位数据链路–帧帧首:14B帧尾部:4B——IPV420~60字节4B [通过4位二进制表示]IPV6固定首部40字节[可拓展]4BTCP20~60字节4BUDP8B字节...

Java代码审计-模板注入漏洞

一、模板引擎 在Java开发当中&#xff0c;为了将前端和后端进行分离&#xff0c;降低项目代码的耦合性&#xff0c;使代码更加易于维护和管理。除去以上的原因&#xff0c;模板引擎还能实现动态和静态数据的分离。 二、主流模板引擎 在Java中&#xff0c;主流的模板引擎有:Fre…...

如何在Linux中使用Cron定时执行SQL任务

文章目录 前言一、方案分析二、使用步骤1.准备脚本2.crontab脚本执行 踩坑 前言 演示数据需要每天更新监控数据&#xff0c;不想手动执行&#xff0c;想到以下解决方案 navicat 创建定时任务java服务定时执行linux crontab 定时执行sql脚本 一、方案分析 我选择了第三个方案…...

数据集划分

1、 sklearn玩具数据集介绍 数据量小&#xff0c;数据在sklearn库的本地&#xff0c;只要安装了sklearn&#xff0c;不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大&#xff0c;数据只能通过网络获取&#xff08;科学上网&#xff09; 3 sklearn加载玩具数据集 示…...

带你读懂什么是AI Agent智能体

一、智能体的定义与特性 定义&#xff1a;智能体是一个使用大语言模型&#xff08;LLM&#xff09;来决定应用程序控制流的系统。然而&#xff0c;智能体的定义并不唯一&#xff0c;不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义&#xff0c;但更…...

react动态路由

在React应用中&#xff0c;动态路由&#xff08;Dynamic Routing&#xff09;通常指的是根据应用的状态或用户的交互来动态地显示或隐藏路由&#xff08;页面或组件&#xff09;。这可以通过多种方法实现&#xff0c;包括使用React Router库&#xff0c;它提供了强大的路由管理…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...