当前位置: 首页 > article >正文

Python使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步

下面是一个使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步的Python解决方案,包含全量同步、增量同步和测试用例。

此解决方案提供了生产级数据同步所需的核心功能,可根据具体场景扩展更多高级特性如:数据转换、字段映射、类型转换等。

设计思路

  1. 全量同步:首次运行时将MySQL表完整导入ClickHouse
  2. 增量同步:基于增量字段(如自增ID或时间戳)同步新增数据
  3. 状态管理:使用JSON文件记录同步位置
  4. 错误处理:完善的日志和异常处理机制
import subprocess
import json
import os
import logging
from configparser import ConfigParser# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)class MySQLToClickHouseSync:def __init__(self, config_path='config.ini'):self.config = self._load_config(config_path)self.state_file = self.config['state_file']self.last_state = self._load_state()def _load_config(self, path):"""加载配置文件"""config = ConfigParser()config.read(path)return {'mysql': dict(config['mysql']),'clickhouse': dict(config['clickhouse']),'state_file': config['general']['state_file']}def _load_state(self):"""加载同步状态"""try:if os.path.exists(self.state_file):with open(self.state_file, 'r') as f:return json.load(f)return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}except Exception as e:logger.error(f"加载状态失败: {e}")return {'last_id': 0, 'last_timestamp': '1970-01-01 00:00:00'}def _save_state(self, state):"""保存同步状态"""try:with open(self.state_file, 'w') as f:json.dump(state, f)logger.info(f"状态已保存: {state}")except Exception as e:logger.error(f"保存状态失败: {e}")def run_clickhouse_command(self, query):"""执行clickhouse-local命令"""cmd = ['clickhouse-local','--query', query]try:result = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True,check=True)logger.debug(f"命令执行成功: {cmd}\n输出: {result.stdout}")return Trueexcept subprocess.CalledProcessError as e:logger.error(f"命令执行失败: {cmd}\n错误: {e.stderr}")return Falsedef full_sync(self):"""全量数据同步"""mysql = self.config['mysql']ch = self.config['clickhouse']query = f"""CREATE TABLE {ch['table']} ENGINE = MergeTree ORDER BY id ASSELECT *FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')"""logger.info("开始全量同步...")if self.run_clickhouse_command(query):# 获取最新ID作为增量起点max_id_query = f"""SELECT max(id) FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')"""cmd = ['clickhouse-local', '--query', max_id_query]result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:new_state = {'last_id': int(result.stdout.strip())}self._save_state(new_state)self.last_state = new_statelogger.info("全量同步完成")return Truereturn Falsedef incremental_sync(self):"""增量数据同步"""mysql = self.config['mysql']ch = self.config['clickhouse']last_id = self.last_state.get('last_id', 0)query = f"""INSERT INTO {ch['table']}SELECT *FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')WHERE id > {last_id}"""logger.info(f"开始增量同步, 最后ID: {last_id}")if self.run_clickhouse_command(query):# 获取新增的最大IDnew_max_query = f"""SELECT max(id) FROM mysql('{mysql['host']}:{mysql['port']}', '{mysql['database']}', '{mysql['table']}', '{mysql['user']}', '{mysql['password']}')WHERE id > {last_id}"""cmd = ['clickhouse-local', '--query', new_max_query]result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0 and result.stdout.strip():new_id = int(result.stdout.strip())if new_id > last_id:self._save_state({'last_id': new_id})self.last_state = {'last_id': new_id}logger.info(f"增量同步完成, 新最后ID: {new_id}")else:logger.info("没有新数据需要同步")return Truereturn False# 配置文件示例 (config.ini)
"""
[general]
state_file = sync_state.json[mysql]
host = 127.0.0.1
port = 3306
database = test_db
table = source_table
user = root
password = mysqlpass[clickhouse]
table = default.target_table
"""if __name__ == "__main__":sync = MySQLToClickHouseSync()# 首次运行全量同步if not sync.last_state.get('last_id'):sync.full_sync()# 后续增量同步sync.incremental_sync()

测试用例

import unittest
import sqlite3
from unittest.mock import patch, MagicMock
import tempfile
import os
import jsonclass TestMySQLToClickHouseSync(unittest.TestCase):def setUp(self):self.config = {'state_file': 'test_state.json','mysql': {'host': '127.0.0.1','port': '3306','database': 'test_db','table': 'source_table','user': 'root','password': 'pass'},'clickhouse': {'table': 'target_table'}}# 创建临时状态文件self.state_file = tempfile.NamedTemporaryFile(delete=False)self.config['state_file'] = self.state_file.namedef tearDown(self):os.unlink(self.state_file.name)def test_full_sync(self):"""测试全量同步"""with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch.object(MySQLToClickHouseSync, '_load_state', return_value={'last_id': 0}), \patch('subprocess.run') as mock_run:# 模拟clickhouse-local成功执行mock_run.return_value = MagicMock(returncode=0, stdout="100")sync = MySQLToClickHouseSync()result = sync.full_sync()# 验证命令执行self.assertTrue(mock_run.called)self.assertTrue(result)# 验证状态更新with open(self.state_file.name) as f:state = json.load(f)self.assertEqual(state['last_id'], 100)def test_incremental_sync(self):"""测试增量同步"""# 初始状态with open(self.state_file.name, 'w') as f:json.dump({'last_id': 50}, f)with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:# 模拟获取新最大ID为75mock_run.side_effect = [MagicMock(returncode=0),  # INSERT执行MagicMock(returncode=0, stdout="75")  # SELECT max(id)]sync = MySQLToClickHouseSync()result = sync.incremental_sync()# 验证命令执行self.assertEqual(mock_run.call_count, 2)self.assertTrue(result)# 验证状态更新with open(self.state_file.name) as f:state = json.load(f)self.assertEqual(state['last_id'], 75)def test_no_new_data(self):"""测试无新数据的情况"""with open(self.state_file.name, 'w') as f:json.dump({'last_id': 100}, f)with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:# 模拟返回空结果mock_run.side_effect = [MagicMock(returncode=0),MagicMock(returncode=0, stdout="")]sync = MySQLToClickHouseSync()result = sync.incremental_sync()self.assertTrue(result)# 状态应保持不变self.assertEqual(sync.last_state['last_id'], 100)def test_command_failure(self):"""测试命令执行失败"""with patch.object(MySQLToClickHouseSync, '_load_config', return_value=self.config), \patch('subprocess.run') as mock_run:mock_run.side_effect = subprocess.CalledProcessError(1, "cmd", output="", stderr="Error")sync = MySQLToClickHouseSync()result = sync.full_sync()self.assertFalse(result)if __name__ == '__main__':unittest.main()

使用说明

  1. 安装依赖:
pip install configparser
  1. 准备配置文件 (config.ini):
[general]
state_file = sync_state.json[mysql]
host = 127.0.0.1
port = 3306
database = your_db
table = source_table
user = root
password = your_mysql_password[clickhouse]
table = default.target_table
  1. 创建ClickHouse表 (自动创建):
-- 首次运行时会自动创建表
-- 表结构自动从MySQL继承
  1. 运行同步:
# 首次运行(全量同步)
python sync.py# 后续运行(增量同步)
python sync.py

关键特性

  1. 高效同步

    • 使用clickhouse-local直接管道传输,无需中间存储
    • 批量数据加载,避免逐行插入
  2. 增量同步机制

    • 基于自增ID的增量检测
    • 支持时间戳字段(需修改WHERE条件)
  3. 状态管理

    • JSON文件记录最后同步位置
    • 支持异常恢复
  4. 错误处理

    • 详细日志记录
    • 子进程错误捕获
    • 状态文件异常处理
  5. 配置驱动

    • 所有参数通过配置文件管理
    • 敏感信息与代码分离

性能优化建议

  1. 大表分批次同步
# 在全量同步中增加分页逻辑
BATCH_SIZE = 100000
for offset in range(0, total_count, BATCH_SIZE):query = f"SELECT * FROM ... LIMIT {BATCH_SIZE} OFFSET {offset}"
  1. 使用时间戳增量
# 修改增量查询条件
WHERE update_time > '{last_timestamp}'
  1. 并行处理
# 使用ThreadPoolExecutor并行处理不同数据分区
from concurrent.futures import ThreadPoolExecutor
  1. 压缩传输
# 在命令中添加压缩选项
clickhouse-local --query "..." | gzip | clickhouse-client --query "INSERT ..."

相关文章:

Python使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步

下面是一个使用clickhouse-local和MySQL表函数实现从MySQL到ClickHouse数据同步的Python解决方案,包含全量同步、增量同步和测试用例。 此解决方案提供了生产级数据同步所需的核心功能,可根据具体场景扩展更多高级特性如:数据转换、字段映射…...

解锁Java线程池:性能优化的关键

一、引言 在 Java 并发编程的世界里,线程池是一个至关重要的概念。简单来说,线程池就是一个可以复用线程的 “池子”,它维护着一组线程,这些线程可以被重复使用来执行多个任务,而不是为每个任务都创建一个新的线程。​…...

如何自定义一个 Spring Boot Starter?

导语: 在后端 Java 面试中,Spring Boot 是绕不开的重点,而“如何自定义一个 Starter”作为进阶开发能力的体现,常被面试官用于考察候选人的工程架构思维与 Spring Boot 底层掌握程度。本文将带你深入理解自定义 Starter 的实现逻辑…...

Linux文件系统详解:从入门到精通

无论是开发高性能应用还是进行系统级编程,文件系统都是我们必须掌握的基础知识。今天,我将带大家深入浅出地了解Linux文件系统的核心概念和工作原理。 一、Linux文件系统概述 Linux文件系统是操作系统中负责管理持久存储设备上数据的子系统。它不仅仅是…...

Electron Fiddle使用笔记

文章目录 下载界面示意图保存和打开项目save 和 save as forge project 其他文档打包报错 RequestError: read ECONNRESET 想要打包前端程序,奈何本地环境总是报错,意外发现可以通过electron fiddle直接调试代码。 下载 百度网盘地址: 首次…...

【PhysUnits】16.1 完善Var 结构体及其运算(variable.rs)

一、源码 这段代码定义了一个泛型结构体 Var,并为它实现了各种数学运算。 /** 变量结构体 Var* 该结构体泛型参数 T 需满足 Numeric 约束*/use core::ops::{Neg, Add, Sub, Mul}; use crate::constant::Integer; /// 定义 Numeric trait,约束 T 必须实…...

企业培训学习考试系统源码 ThinkPHP框架+Uniapp支持多终端适配部署

在数字化转型浪潮下,企业对高效培训与精准考核的需求日益迫切。一套功能完备、多终端适配且易于定制的培训学习考试系统,成为企业提升员工能力、检验培训成果的关键工具。本文给大家分享一款基于 ThinkPHP 框架与 Uniapp 开发的企业培训学习考试系统&…...

C++ if语句完全指南:从基础到工程实践

一、选择结构在程序设计中的核心地位 程序流程控制如同城市交通网络,if语句则是这个网络中的决策枢纽。根据ISO C标准,选择结构占典型项目代码量的32%-47%,其正确使用直接影响程序的: 逻辑正确性 执行效率 可维护性 安全边界 …...

SpringBoot手动实现流式输出方案整理以及SSE规范输出详解

背景: 最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并…...

深入解析I²C总线接口:从基础到应用

IC总线概述与基本概念 一句话概述:本章节将介绍IC总线的历史、定义及其在嵌入式系统中的作用,帮助读者建立对IC的基本理解。 IC(Inter-Integrated Circuit)总线是一种广泛应用于嵌入式系统中的串行通信协议,最初由飞利…...

Sklearn 机器学习 缺失值处理 检测数据每列的缺失值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在代码与灵感交织的数字世界里和大家相遇~💖 ✨ 在这个技术浪潮奔涌的时代,我们既是探索者,也是分享者。我始终相信,每一行代码都是通往创新的钥匙,而分享则能让这把钥匙照亮更多人的…...

Unity基于GraphView的可视化关卡编辑器开发指南

一、GraphView技术基础与应用场景 1. GraphView核心组件 组件功能描述关卡编辑应用GraphView画布容器关卡拓扑结构编辑区Node基础节点房间/敌人/道具等关卡元素Edge节点连接线路径/依赖关系Port连接端口入口/出口标记Blackboard属性面板元素参数配置Minimap缩略图导航大型关卡…...

STL解析——list的使用

目录 1.简介 2.构造函数 3.迭代器 3.1封装 3.2迭代器分类 4.排序性能 4.1链式与数组 4.2缓存读取 1.简介 STL容器中提供的list容器也是一种顺序容器,底层实现方式是带头双向链表,这种实现方式能比单链表更高效的访问数据。 下面围绕部分重要接口…...

华为大规模——重塑生产力

华为大模型通过以下几个方面重塑生产力: 提供强大算力支持 华为致力于构建领先的昇腾人工智能算力平台,推出高性能昇腾AI集群,支持月级长期稳定训练,可靠性业界领先。同时打造开放的昇腾计算平台,兼容主流算子、框…...

【Go面试陷阱】对未初始化的chan进行读写为何会卡死?

Go面试陷阱:对未初始化的chan进行读写为何会卡死?深入解析nil channel的诡异行为 在Go的世界里,var ch chan int 看似人畜无害,实则暗藏杀机。它不会报错,不会panic,却能让你的程序悄无声息地"卡死&qu…...

SpringBoot自动化部署实战技术文章大纲

技术背景与目标 介绍SpringBoot在现代开发中的重要性自动化部署的价值:提升效率、减少人为错误、实现CI/CD适用场景:中小型Web应用、微服务架构 自动化部署核心方案 基于Docker的容器化部署 SpringBoot应用打包为Docker镜像使用Docker Compose编排多容…...

软件项目管理(3) 软件项目任务分解

一、相关概念 1.任务分解的方法和步骤 (1)方法 模板参照方法:参照有标准或半标准的任分解结构图类比方法:任务分解结构图经常被重复使用,具有相似性自顶向下方法:一般->特殊,演绎推理从大…...

MQTTX连接阿里云的物联网配置

本文的目标是通过MQTTX的客户端,连接到阿里云的物联网的平台,发送温度信息,在阿里云的平台中显示出来。阿里云免费注册,免费有一个MQTT的服务器。有数量限制,但是对于测试来讲,已经足够。 1、注册阿里云的物…...

20250606-C#知识:匿名函数、Lambda表达式与闭包

C#知识&#xff1a;匿名方法、Lambda表达式与闭包 闭包乍一听感觉很复杂&#xff0c;其实一点也不简单 1、匿名方法 没有方法名的方法一般用于委托和事件 Func<int, int, int> myAction delegate(int a, int b) { return a b; }; Console.WriteLine( myAction(1, 2)…...

数字证书_CA_详解

目录 一、数字证书简介 二、 CA&#xff08;证书颁发机构&#xff09; (一) 证书链&#xff08;信任链&#xff09; 1. 根证书 2. 中间证书 3. 网站证书 (二) 抓包软件的证书链与信任机制 1. 抓包通信流程 2. 证书链伪造与信任验证流程 (三) 关于移动设备的CA 一、数…...

衡量嵌入向量的相似性的方法

衡量嵌入向量的相似性的方法 一、常见相似性计算方法对比 方法核心原理公式优点缺点适用场景余弦相似度计算向量夹角的余弦值,衡量方向相似性,与向量长度无关。$\text{cos}\theta = \frac{\mathbf{a} \cdot \mathbf{b}}{\mathbf{a}\mathbf{b}欧氏距离计算向量空间中的直线距离…...

Python爬虫实战:Yelp餐厅数据采集完整教程

前言 在数据分析和商业智能领域&#xff0c;餐厅和商户信息的采集是一个常见需求。Yelp作为全球知名的本地商户评论平台&#xff0c;包含了大量有价值的商户信息。本文将详细介绍如何使用Python开发一个高效的Yelp数据爬虫&#xff0c;实现商户信息的批量采集。 技术栈介绍 …...

微服务常用日志追踪方案:Sleuth + Zipkin + ELK

在微服务架构中&#xff0c;一个用户请求往往需要经过多个服务的协同处理。为了有效追踪请求的完整调用链路&#xff0c;需要一套完整的日志追踪方案。Sleuth Zipkin ELK 组合提供了完整的解决方案 Sleuth&#xff1a;生成和传播追踪IDZipkin&#xff1a;收集、存储和可视化…...

API是什么意思?如何实现开放API?

目录 一、API 是什么 &#xff08;一&#xff09;API 的定义 &#xff08;二&#xff09;API 的作用 二、API 的类型 &#xff08;一&#xff09;Web API 1. RESTful API 2. SOAP API &#xff08;二&#xff09;操作系统 API &#xff08;三&#xff09;数据库 API …...

12.6Swing控件4 JSplitPane JTabbedPane

JSplitPane JSplitPane 是 Java Swing 中用于创建分隔面板的组件&#xff0c;支持两个可调整大小组件的容器。它允许用户通过拖动分隔条来调整两个组件的相对大小&#xff0c;适合用于需要动态调整视图比例的场景。 常用方法&#xff1a; setLeftComponent(Component comp)&a…...

Python训练第四十六天

DAY 46 通道注意力(SE注意力) 知识点回顾&#xff1a; 不同CNN层的特征图&#xff1a;不同通道的特征图什么是注意力&#xff1a;注意力家族&#xff0c;类似于动物园&#xff0c;都是不同的模块&#xff0c;好不好试了才知道。通道注意力&#xff1a;模型的定义和插入的位置通…...

C++编程——关于比较器的使用

注&#xff1a; 简单记录一下C里比较器的构建&#xff0c;常用于自定义 sort() 函数和优先队列的改写优先级。 简单构建比较器&#xff1a; sort() 函数&#xff1a; vector<int> arr;//(a, b) -> true : a < b //升序排列 bool compare(int a, int b) {retur…...

第2天:认识LSTM

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 目标 具体实现 &#xff08;一&#xff09;环境 语言环境&#xff1a;Python 3.10 编 译 器: PyCharm 框 架: pytorch &#xff08;二&#xff09;具体步骤…...

自动化提示生成框架(AutoPrompt)

自动化提示生成框架(AutoPrompt) 一、核心创新点 自动化提示生成框架(AutoPrompt) 创新本质:提出基于梯度引导搜索的自动化提示生成方法,替代人工设计模板的传统模式。技术路径: 将提示视为可训练的离散token序列,通过优化提示向量(prompt embedding)搜索语义空间。利…...

两轮自平衡机器人建模、LQR控制与仿真分析

以下是一个针对两轮自平衡机器人(平衡车) 的完整建模、控制设计与仿真分析报告,包含详细的理论推导、控制算法实现及Python仿真代码。 两轮自平衡机器人建模、LQR控制与仿真分析 1. 引言 两轮自平衡机器人是一种典型的欠驱动、非线性、不稳定系统,其动力学特性与倒立摆高度…...