Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C++程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数据到第三个实例中两个数据库中多个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
根据需求,我设计了以下解决方案。由于代码较长,这里分为多个部分说明:
- 配置文件(config.json):
{"instances": {"source1": {"host": "192.168.1.100","port": 5432,"user": "user1","password": "pass1"},"source2": {"host": "192.168.1.101","port": 5432,"user": "user2","password": "pass2"},"target": {"host": "192.168.1.102","port": 5432,"user": "target_user","password": "target_pass"}},"database_mappings": [{"source_instance": "source1","source_db": "db1","schemas": ["schema1", "schema2"],"target_db": "target_db1"},{"source_instance": "source2","source_db": "db2","schemas": ["schema3", "schema4"],"target_db": "target_db2"}],"retry": {"max_attempts": 3,"interval_seconds": 60},"log_directory": "logs"
}
- 数据库操作类(database.h):
#include <pqxx/pqxx>
#include <string>
#include <memory>class DatabaseConnector {
public:DatabaseConnector(const std::string& host, int port, const std::string& dbname, const std::string& user,const std::string& password){std::string conn_str = fmt::format("host={} port={} dbname={} user={} password={}",host, port, dbname, user, password);conn_ = std::make_unique<pqxx::connection>(conn_str);}pqxx::result query(const std::string& sql) {pqxx::work txn(*conn_);auto result = txn.exec(sql);txn.commit();return result;}// COPY命令高效传输数据void copy_table(const std::string& source_table, const std::string& target_table,std::shared_ptr<DatabaseConnector> target_conn) {pqxx::work source_txn(*conn_);std::stringstream buffer;source_txn.conn().copy_to(source_table, buffer);source_txn.commit();pqxx::work target_txn(*target_conn->conn_);target_txn.conn().copy_from(buffer, target_table);target_txn.commit();}private:std::unique_ptr<pqxx::connection> conn_;
};
- 日志管理类(logger.h):
#include <fstream>
#include <chrono>
#include <iomanip>
#include <filesystem>class Logger {
public:Logger(const std::string& log_dir) {namespace fs = std::filesystem;fs::create_directories(log_dir);auto now = std::chrono::system_clock::now();auto today = std::chrono::floor<std::chrono::days>(now);log_file_ = log_dir + "/" + format_time(today, "%Y-%m-%d") + ".log";}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx_);std::ofstream file(log_file_, std::ios::app);file << "[" << format_time(std::chrono::system_clock::now()) << "] " << message << "\n";}private:std::string format_time(auto time_point, const char* fmt = "%T") {auto in_time_t = std::chrono::system_clock::to_time_t(time_point);std::stringstream ss;ss << std::put_time(std::localtime(&in_time_t), fmt);return ss.str();}std::string log_file_;std::mutex mtx_;
};
- 主程序逻辑(main.cpp):
#include <nlohmann/json.hpp>
#include <vector>
#include <thread>struct Config {// 解析配置的代码
};void replicate_data(std::shared_ptr<DatabaseConnector> source,std::shared_ptr<DatabaseConnector> target,const std::string& schema,Logger& logger) {// 获取所有表结构auto tables = source->query(fmt::format("SELECT table_name FROM information_schema.tables ""WHERE table_schema = '{}' AND table_type = 'BASE TABLE'", schema));for (const auto& row : tables) {std::string table_name = row[0].as<std::string>();std::string qualified_name = schema + "." + table_name;// 记录开始信息logger.log(fmt::format("START Table: {}", qualified_name));try {// 使用移动数据的高效方法source->copy_table(qualified_name, qualified_name, target);// 记录成功信息logger.log(fmt::format("SUCCESS Table: {}", qualified_name));} catch (const std::exception& e) {// 异常处理和重试逻辑logger.log(fmt::format("ERROR Table: {} - {}", qualified_name, e.what()));// 重试逻辑实现}}
}int main() {// 加载配置Config config = load_config("config.json");// 初始化连接池auto source1 = create_connection(config.instances["source1"]);// 类似初始化其他连接// 创建日志器Logger logger(config.log_directory);// 处理每个数据库映射for (const auto& mapping : config.database_mappings) {// 获取源和目标连接auto source = get_connection(mapping.source_instance);auto target = get_connection("target");// 处理每个schemafor (const auto& schema : mapping.schemas) {replicate_data(source, target, schema, logger);}}return 0;
}
实现要点说明:
- 性能优化:
- 使用PostgreSQL的COPY命令进行批量数据传输
- 采用连接池管理数据库连接
- 多线程处理不同schema的复制任务(需要添加线程池)
- 错误处理:
- 三级重试机制(配置可调)
- 事务回滚保证数据一致性
- 详细的错误日志记录
- 日志管理:
- 每天自动滚动日志文件
- 记录毫秒级时间戳
- 包含完整的状态信息
- 线程安全的日志写入
- 扩展性:
- 模块化设计方便扩展新的数据库实例
- 配置驱动,无需重新编译即可修改参数
- 支持Schema级别的并行复制
编译和运行:
# 编译命令
g++ -std=c++17 -o pg_replicator main.cpp database.cpp logger.cpp -lpqxx -lpq -lfmt# 运行命令
./pg_replicator --config config.json
建议的扩展改进方向:
- 增加以下监控功能:
- 实时进度显示
- Prometheus指标导出
- 电子邮件/短信报警
- 性能优化:
- 并行复制不同表
- 流水线传输
- 压缩传输
- 数据一致性保障:
- 校验和验证
- 断点续传
- 双向同步检测
- 安全性增强:
- 加密连接支持
- 凭据安全存储
- 审计日志
这个设计在保证功能完整性的同时,充分考虑了大规模数据复制的性能需求,并通过模块化设计保证了可维护性。异常处理和日志系统为生产环境运行提供了必要保障。
相关文章:
Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数…...
Linux下的网络通信编程
在不同主机之间,进行进程间的通信。 1解决主机之间硬件的互通 2.解决主机之间软件的互通. 3.IP地址:来区分不同的主机(软件地址) 4.MAC地址:硬件地址 5.端口号:区分同一主机上的不同应用进程 网络协议…...
Windows在多网络下指定上网接口
Windows在多网络下指定上网接口 一、说明 设备情况:win11,同时连接了有线网和WLAN,有线网连接着NAS必须保持连接。需求:有些情况时,有线网无网络而WLAN有网,但系统仍走着有线导致无法上网。 二、方法 过…...
网络安全员证书
软考网络安全员证书:信息安全领域的黄金标准 随着信息技术的飞速发展,网络安全问题日益凸显,网络安全员的需求也日益增加。软考网络安全员证书作为信息安全领域的黄金标准,对于网络安全从业者来说具有重要意义。本文将详细介绍…...
CMU15445(2023fall) Project #4 - Concurrency Control踩坑历程
把树木磨成月亮最亮时的样子, 就能让它更快地滚下山坡, 有时会比骑马还快。 完整代码见: SnowLegend-star/CMU15445-2023fall: Having Conquered the Loftiest Peak, We Stand But a Step Away from Victory in This Stage. With unwavering…...
医疗AR眼镜:FPC如何赋能科技医疗的未来之眼?【新立电子】
随着科技的飞速发展,增强现实(AR)技术在医疗领域的应用逐渐成为焦点。医疗AR眼镜作为一种前沿的智能设备,正在为医疗行业带来深刻的变革。它不仅能够提升医生的工作效率,还能改善患者的就医体验,成为医疗科…...
Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比
前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…...
服务器迁移记录【腾讯云-->阿里云】
准备工作 压缩/root /usr/local/nginx /data三个目录到zip,并下载到本地。 zip root.zip /root zip nginx.zip /usr/local/nginx zip data.zip /datasz root.zip sz nginx.zip sz data.zip连接mysql数据库,导出数据库结构与数据到dzs_mysql.sql 安装l…...
序列化选型:字节流抑或字符串
序列化既可以将对象转换为字节流,也可以转换为字符串,具体取决于使用的序列化方式和场景。 转换为字节流 常见工具及原理:在许多编程语言中,都有将对象序列化为字节流的机制。例如 Python 中的 pickle 模块、Java 中的对象序列化…...
面向实时性的超轻量级动态感知视觉SLAM系统
一、重构后的技术架构设计(基于ROS1 ORB-SLAM2增强) #mermaid-svg-JEJte8kZd7qlnq3E {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JEJte8kZd7qlnq3E .error-icon{fill:#552222;}#mermaid-svg-JEJte8kZd7qlnq3E .…...
4-3自定义加载器,并添加功能
一、自定义类加载器的实现步骤 继承ClassLoader类 自定义类加载器需继承java.lang.ClassLoader,并选择性地重写以下方法: findClass(String name):核心方法,用于根据类名查找并加载类的字节码。需从自定义路径(…...
Python Scrapy爬虫面试题及参考答案
目录 简述 Scrapy 框架的基本工作流程,并说明各组件的作用 Scrapy 中的 Spider、CrawlSpider 和 Rule 的作用及区别? 如何通过 Scrapy Shell 快速调试页面解析逻辑? Scrapy 的 start_requests 方法与 start_urls 的关系是什么? 解释 Scrapy 的 Request 和 Response 对象…...
Swan 表达式 - 选择表达式
ANSYS Swan 表达式支持选择(selection)表达式 case, if/then/else。选择表达式根据特定的条件选择不同的分支流。 if/then/else 表达式 if/then/else 表达式的文法如下 if expr then expr else expr 其中,首个expr 的布尔表达式,若其为 true, 则返回 …...
微信小程序:完善购物车功能,购物车主页面展示,详细页面展示效果
一、效果图 1、主页面 根据物品信息进行菜单分类,点击单项购物车图标添加至购物车,记录总购物车数量 2、购物车详情页 根据主页面选择的项,根据后台查询展示到页面,可进行多选,数量加减等 二、代码 1、主页面 页…...
javaweb将上传的图片保存在项目文件webapp下的upload文件夹下
前端HTML表单 (upload.html) 首先,创建一个HTML页面,允许用户选择并上传图片。 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>图片上传</title> </head> <…...
LabVIEW 无法播放 AVI 视频的编解码器解决方案
用户在 LabVIEW 中使用示例程序 Read AVI File.vi(路径: 📌 C:\Program Files (x86)\National Instruments\LabVIEW 2019\examples\Vision\Files\Read AVI File.vi)时发现: ✅ LabVIEW 自带的 AVI 视频可正常播放 这是…...
composer 错误汇总
文章目录 1: 安装EasyWeChat 报错2: composer install 报错, laravel/framework[v11.9.0, ..., v11.44.0] require fruitcake/php-cors ^1.33: 卸载Pulse 报错, Class "Laravel\Pulse\Pulse" not found4: 卸载Telescope报错 1: 安装EasyWeChat 报错 解决: composer …...
MySQL锁分类
一、按锁的粒度划分 全局锁 定义:锁定整个数据库实例,阻止所有写操作,确保数据备份一致性。加锁方式:通过FLUSH TABLES WITH READ LOCK实现,释放需执行UNLOCK TABLES。应用场景:适用于全库逻辑备份…...
DeepSeek 助力 Vue3 开发:打造丝滑的悬浮按钮(Floating Action Button)
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 Deep…...
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构 一、信息熵与生命系统的耗散结构 在热力学第二定律框架下,生命系统可视为负熵流的耗散结构: d S d i S d e S dS d_iS d_eS dSdiSdeS 其中 d i S d_iS diS为内部熵…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
【Linux】Linux 系统默认的目录及作用说明
博主介绍:✌全网粉丝23W,CSDN博客专家、Java领域优质创作者,掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
