当前位置: 首页 > news >正文

Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time# 配置参数
CONFIG = {'DB': {'host': 'xxx','database': 'xxx','user': 'xxxx','password': 'xxxx'},'BATCH_SIZE': 5000,'TOTAL_RECORDS': 1000000,'NUM_THREADS': 5,'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv','MAX_RETRIES': 3,           # 最大重试次数'RETRY_DELAY': 5,           # 重试延迟(秒)'CONNECTION_TIMEOUT': 60    # 连接超时时间(秒)
}# 设置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)class DatabaseManager:_thread_local = threading.local()@classmethod@contextmanagerdef get_connection(cls):"""线程安全的数据库连接管理器"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:    if not hasattr(cls._thread_local, "client"):cls._thread_local.client = Client(**CONFIG['DB'],connect_timeout=CONFIG['CONNECTION_TIMEOUT'])logger.info(f"Created new database connection for thread {threading.current_thread().name}")yield cls._thread_local.clientbreakexcept Exception as e:retry_count += 1logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raise@classmethoddef close_all_connections(cls):"""关闭当前线程的数据库连接"""if hasattr(cls._thread_local, "client"):cls._thread_local.client.disconnect()delattr(cls._thread_local, "client")logger.info(f"Closed database connection for thread {threading.current_thread().name}")class DataProcessor:def __init__(self):self.columns = ["a", "b", "c", "d"]self.query = '''SELECTa,b,c,dFROMtable_nameORDER BYa,b,c,d '''self.file_lock = Lock()  # 添加文件写入锁self.total_rows = 0      # 添加行数统计self.processed_batches = set()  # 记录已成功处理的批次self.failed_batches = set()     # 记录失败的批次def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:"""获取一批数据,带重试机制"""retry_count = 0while retry_count < CONFIG['MAX_RETRIES']:try:with DatabaseManager.get_connection() as client:query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"result = client.execute(query_with_limit)logger.info(f"Fetched {len(result)} records starting from {start}.")return resultexcept Exception as e:retry_count += 1logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")if retry_count < CONFIG['MAX_RETRIES']:time.sleep(CONFIG['RETRY_DELAY'])else:raisedef save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):"""保存数据到CSV文件"""try:with self.file_lock:  # 使用锁保护文件写入file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0df.to_csv(file_name, mode='a', header= not file_exists,index=False)self.total_rows += len(df)self.processed_batches.add(batch_start)logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")except Exception as e:logger.error(f"Error saving to CSV: {str(e)}")raisedef process_batch(self, start: int, batch_size: int, output_file: str):"""处理单个批次的数据"""try:if start in self.processed_batches:logger.info(f"Batch {start} already processed, skipping.")return Trueresult_batch = self.fetch_data_batch(batch_size, start)df_batch = pd.DataFrame(result_batch, columns=self.columns)self.save_to_csv(df_batch, output_file, start)return Trueexcept Exception as e:logger.error(f"Error processing batch starting at {start}: {str(e)}")self.failed_batches.add(start)return Falsedef main_v1():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()  # 用于重试失败的批次threads = []def worker():while True:try:start = queue.get()if start is None:breaksuccess = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)queue.task_done()except Exception as e:logger.error(f"Worker thread error: {str(e)}")finally:queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次while not retry_queue.empty():start = retry_queue.get()logger.info(f"Retrying failed batch starting at {start}")if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):logger.info(f"Successfully retried batch {start}")else:logger.error(f"Failed to process batch {start} after retries")# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()def main():try:processor = DataProcessor()output_file = CONFIG['OUTPUT_FILE']# 清空或创建输出文件with open(output_file, 'w', encoding='utf-8') as f:passqueue = Queue()retry_queue = Queue()threads = []def worker():while True:try:start = queue.get()if start is None:  # 退出信号queue.task_done()breaktry:success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)if not success:retry_queue.put(start)except Exception as e:logger.error(f"Error processing batch at offset {start}: {str(e)}")retry_queue.put(start)finally:queue.task_done()  # 只在这里调用一次except Exception as e:logger.error(f"Worker thread error: {str(e)}")# 不要在这里调用 queue.task_done()# 启动工作线程for _ in range(CONFIG['NUM_THREADS']):t = threading.Thread(target=worker)t.daemon = Truet.start()threads.append(t)# 添加任务到队列total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):queue.put(start)# 等待主要处理完成queue.join()# 处理失败的批次retry_count = 0max_retries = 3while not retry_queue.empty() and retry_count < max_retries:retry_count += 1retry_size = retry_queue.qsize()logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")# 将失败的批次重新放入主队列for _ in range(retry_size):start = retry_queue.get()queue.put(start)# 等待重试完成queue.join()# 停止所有线程for _ in threads:queue.put(None)for t in threads:t.join()# 最终验证logger.info(f"Processing completed. Total rows: {processor.total_rows}")logger.info(f"Expected batches: {total_batches}")logger.info(f"Processed batches: {len(processor.processed_batches)}")logger.info(f"Failed batches: {len(processor.failed_batches)}")if processor.failed_batches:logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")# 验证数据完整性try:df_final = pd.read_csv(output_file)actual_rows = len(df_final)logger.info(f"Final CSV file contains {actual_rows} rows")if actual_rows != processor.total_rows:logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")# 检查是否有重复的表头duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]if not duplicate_headers.empty:logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")# 清理重复表头df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]df_final.to_csv(output_file, index=False)logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")except Exception as e:logger.error(f"Error validating final CSV file: {str(e)}")except Exception as e:logger.error(f"Main process error: {str(e)}")raisefinally:DatabaseManager.close_all_connections()if __name__ == "__main__":start_time = timeit.default_timer()try:main()elapsed_time = timeit.default_timer() - start_timelogger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")except Exception as e:logger.error(f"程序执行失败: {str(e)}")raise
主要类
  • DatabaseManager

管理数据库连接的线程安全类

使用 threading.local() 确保每个线程有自己的连接

包含重试机制和连接管理功能

  • DataProcessor

处理数据的核心类

定义了数据列和查询语句

处理数据批次的获取和保存

跟踪处理状态和失败的批次

2. 工作流程

  • 初始化

创建空的输出文件

初始化线程池和任务队列

  • 数据处理

将总数据量分成多个批次

多个工作线程并行处理数据批次

每个批次:

  • 从数据库获取数据
  • 转换为 DataFrame
  • 保存到 CSV 文件
  • 错误处理

失败的批次会进入重试队列

最多重试 3 次

记录所有失败的批次

  • 数据验证

检查最终 CSV 文件的行数

检查和清理重复的表头

验证数据完整性

3. 特点

  • 线程安全

使用线程本地存储管理数据库连接

文件写入使用锁保护

  • 容错机制

数据库连接重试

批次处理重试

详细的日志记录

  • 性能优化

批量处理数据

多线程并行处理

使用队列管理任务

  • 监控和日志

详细的日志记录

处理进度跟踪

执行时间统计

这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。

相关文章:

Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd from clickhouse_driver import Client import timeit import logging import threading from threading import Lock from queue import Queue from typing import List, Dict, Set from contextlib import contextmanager import os import time# 配置参…...

如何找到系统中bert-base-uncased默认安装位置

问题&#xff1a; 服务器中无法连接huggingface&#xff0c;故需要自己将模型文件上传 ubuntu 可以按照这个链接下载 Bert下载和使用&#xff08;以bert-base-uncased为例&#xff09; - 会自愈的哈士奇 - 博客园 里面提供了giehub里面的链接 GitHub - google-research/be…...

在启动 Spring Boot 项目时,报找不到 slf4j 的错误

而且 tomcat 的启动信息不知道为什么输出出来了 问 AI 得到的解决方案&#xff1a; 将 pom.xml 中的如下配置替换成这样&#xff0c;排除这个插件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring - boot - starter - …...

android-12-source-code--write-file-function

find /app4/lineage19_oneplus6/system/ -name "*.cpp" -type f | while read -r k ; do ( grep -i write $k | grep -i file && echo $k ;) ; done获得android::base::WriteStringToFile, 进一步修改 find /app4/lineage19_oneplus6/system/ -name &qu…...

SQL(2)

一.时间盲注 有回显时用Union带出数据&#xff0c;只显示是否时可用布尔盲注得出数据&#xff0c;那如果没有任何输出时&#xff1f; 比如无论查询什么&#xff0c;都显示success&#xff0c;同一个回应&#xff0c;无法直接从服务器注入出任何数据&#xff0c;但是我们可以利…...

【IC每日一题:AMBA总线--APB协议时序及Verilog实现】

AMBA总线--APB协议时序及Verilog实现 1 APB3协议1.1 APB3时序1.1.1 APB写操作1.1.2 APB读操作 2 代码2.1 apb_master2.2 apb_slave 【博客首发于微信公众号《漫谈芯片与编程》&#xff0c;欢迎专注一下&#xff0c;多谢大家】 AMBA总线是用于连接微控制器和外围设备的总线协议&…...

抢先看!为什么很多公司会强行给员工电脑加屏幕水印?千字长文来解答

2024年度热议&#xff1a;为什么很多公司会强行给员工电脑加屏幕水印&#xff1f; 有人说&#xff1a;概是为了让员工时刻铭记&#xff0c;工作就像这水印&#xff0c;无处不在&#xff0c;想逃也逃不掉&#xff01; “玩归玩&#xff0c;闹归闹”。 本文将对此进行详尽解答&…...

【AI技术】PaddleSpeech部署方案

【AI技术】PaddleSpeech部署方案 技术介绍优点缺点 部署基础环境的搭建分步详解国内镜像源切换所需环境1 g所需环境2 vim所需环境3 cuda所需环境4 cudnn所需环境5 ssl源码拉取PaddleSpeech环境安装 部署文件分享DockerHub 技术介绍 PaddleSpeech是飞浆平台的一款TTS框架。 优…...

可灵开始“独闯”,全面拥抱AI的快手能否尝到“甜头”?

现任谷歌CEO桑达尔皮查伊曾说到&#xff0c;“人工智能是我们人类正在从事的最为深刻的研究方向之一&#xff0c;甚至要比火与电还更加深刻。” 正如&#xff0c;Sora诞生时&#xff0c;在官方表述中被称为“世界模拟器”&#xff0c;它理解真实的规则&#xff0c;并在此基础上…...

qt QtConcurrent 详解

1、概述 QtConcurrent是Qt框架中用于简化多线程编程的一个模块&#xff0c;它提供了高层次的API来实现并行计算&#xff0c;而不需要开发者直接管理线程的创建、调度和销毁。QtConcurrent主要通过QFuture和QThreadPool来进行并发任务的执行&#xff0c;能够自动利用系统的所有…...

基于构件的软件开发、软件维护、区块链技术及湖仓一体架构的应用

目录 试题一 论基于构件的软件开发方法及其应用 试题二 论软件维护方法及其应用 试题三 论区块链技术及应用 试题四 论湖仓一体架构及其应用 相关推荐 试题一 论基于构件的软件开发方法及其应用 基于构件的软件开发(Component-Based Software Development&#xff0c;CBSD…...

【在Typora中绘制用户旅程图和甘特图】

在 Typora 中可以使用 Mermaid 绘制用户旅程图&#xff08;User Journey Map&#xff09;&#xff0c;但由于 Mermaid 并不直接支持用户旅程图&#xff0c;我们可以通过一些图表的变通方式&#xff08;比如流程图或甘特图&#xff09;来表示用户旅程图的结构。用户旅程图通常展…...

【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(2)

快速跳转&#xff1a; 我的个人博客主页&#x1f449;&#xff1a;Reuuse博客 新开专栏&#x1f449;&#xff1a;Vue3专栏 参考文献&#x1f449;&#xff1a;uniapp官网 ❀ 感谢支持&#xff01;☀ 前情提要 &#x1f53a;因为最近学习的vue语言&#xff0c;发现有很多细节…...

uniapp中使用全局样式文件引入的三种方式

如果你想在 uni-app 中全局引入 SCSS 文件&#xff08;例如 global.scss&#xff09;&#xff0c;可以通过以下步骤进行配置&#xff1a; 方法一&#xff1a;在 main.js 中引入 在 main.js 中引入全局样式&#xff1a; 你可以在 src/main.js 文件中直接引入 SCSS 文件&#xff…...

计算机网络易混淆知识点串记

文章目录 计算机网络易混淆知识点串记各层PDU首部: 计算机网络易混淆知识点串记 各层PDU首部: PUD首部长度 (B:字节)首部单位数据链路–帧帧首:14B帧尾部:4B——IPV420~60字节4B [通过4位二进制表示]IPV6固定首部40字节[可拓展]4BTCP20~60字节4BUDP8B字节...

Java代码审计-模板注入漏洞

一、模板引擎 在Java开发当中&#xff0c;为了将前端和后端进行分离&#xff0c;降低项目代码的耦合性&#xff0c;使代码更加易于维护和管理。除去以上的原因&#xff0c;模板引擎还能实现动态和静态数据的分离。 二、主流模板引擎 在Java中&#xff0c;主流的模板引擎有:Fre…...

如何在Linux中使用Cron定时执行SQL任务

文章目录 前言一、方案分析二、使用步骤1.准备脚本2.crontab脚本执行 踩坑 前言 演示数据需要每天更新监控数据&#xff0c;不想手动执行&#xff0c;想到以下解决方案 navicat 创建定时任务java服务定时执行linux crontab 定时执行sql脚本 一、方案分析 我选择了第三个方案…...

数据集划分

1、 sklearn玩具数据集介绍 数据量小&#xff0c;数据在sklearn库的本地&#xff0c;只要安装了sklearn&#xff0c;不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大&#xff0c;数据只能通过网络获取&#xff08;科学上网&#xff09; 3 sklearn加载玩具数据集 示…...

带你读懂什么是AI Agent智能体

一、智能体的定义与特性 定义&#xff1a;智能体是一个使用大语言模型&#xff08;LLM&#xff09;来决定应用程序控制流的系统。然而&#xff0c;智能体的定义并不唯一&#xff0c;不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义&#xff0c;但更…...

react动态路由

在React应用中&#xff0c;动态路由&#xff08;Dynamic Routing&#xff09;通常指的是根据应用的状态或用户的交互来动态地显示或隐藏路由&#xff08;页面或组件&#xff09;。这可以通过多种方法实现&#xff0c;包括使用React Router库&#xff0c;它提供了强大的路由管理…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...