Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C++程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数据到第三个实例中两个数据库中多个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
根据需求,我设计了以下解决方案。由于代码较长,这里分为多个部分说明:
- 配置文件(config.json):
{"instances": {"source1": {"host": "192.168.1.100","port": 5432,"user": "user1","password": "pass1"},"source2": {"host": "192.168.1.101","port": 5432,"user": "user2","password": "pass2"},"target": {"host": "192.168.1.102","port": 5432,"user": "target_user","password": "target_pass"}},"database_mappings": [{"source_instance": "source1","source_db": "db1","schemas": ["schema1", "schema2"],"target_db": "target_db1"},{"source_instance": "source2","source_db": "db2","schemas": ["schema3", "schema4"],"target_db": "target_db2"}],"retry": {"max_attempts": 3,"interval_seconds": 60},"log_directory": "logs"
}
- 数据库操作类(database.h):
#include <pqxx/pqxx>
#include <string>
#include <memory>class DatabaseConnector {
public:DatabaseConnector(const std::string& host, int port, const std::string& dbname, const std::string& user,const std::string& password){std::string conn_str = fmt::format("host={} port={} dbname={} user={} password={}",host, port, dbname, user, password);conn_ = std::make_unique<pqxx::connection>(conn_str);}pqxx::result query(const std::string& sql) {pqxx::work txn(*conn_);auto result = txn.exec(sql);txn.commit();return result;}// COPY命令高效传输数据void copy_table(const std::string& source_table, const std::string& target_table,std::shared_ptr<DatabaseConnector> target_conn) {pqxx::work source_txn(*conn_);std::stringstream buffer;source_txn.conn().copy_to(source_table, buffer);source_txn.commit();pqxx::work target_txn(*target_conn->conn_);target_txn.conn().copy_from(buffer, target_table);target_txn.commit();}private:std::unique_ptr<pqxx::connection> conn_;
};
- 日志管理类(logger.h):
#include <fstream>
#include <chrono>
#include <iomanip>
#include <filesystem>class Logger {
public:Logger(const std::string& log_dir) {namespace fs = std::filesystem;fs::create_directories(log_dir);auto now = std::chrono::system_clock::now();auto today = std::chrono::floor<std::chrono::days>(now);log_file_ = log_dir + "/" + format_time(today, "%Y-%m-%d") + ".log";}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx_);std::ofstream file(log_file_, std::ios::app);file << "[" << format_time(std::chrono::system_clock::now()) << "] " << message << "\n";}private:std::string format_time(auto time_point, const char* fmt = "%T") {auto in_time_t = std::chrono::system_clock::to_time_t(time_point);std::stringstream ss;ss << std::put_time(std::localtime(&in_time_t), fmt);return ss.str();}std::string log_file_;std::mutex mtx_;
};
- 主程序逻辑(main.cpp):
#include <nlohmann/json.hpp>
#include <vector>
#include <thread>struct Config {// 解析配置的代码
};void replicate_data(std::shared_ptr<DatabaseConnector> source,std::shared_ptr<DatabaseConnector> target,const std::string& schema,Logger& logger) {// 获取所有表结构auto tables = source->query(fmt::format("SELECT table_name FROM information_schema.tables ""WHERE table_schema = '{}' AND table_type = 'BASE TABLE'", schema));for (const auto& row : tables) {std::string table_name = row[0].as<std::string>();std::string qualified_name = schema + "." + table_name;// 记录开始信息logger.log(fmt::format("START Table: {}", qualified_name));try {// 使用移动数据的高效方法source->copy_table(qualified_name, qualified_name, target);// 记录成功信息logger.log(fmt::format("SUCCESS Table: {}", qualified_name));} catch (const std::exception& e) {// 异常处理和重试逻辑logger.log(fmt::format("ERROR Table: {} - {}", qualified_name, e.what()));// 重试逻辑实现}}
}int main() {// 加载配置Config config = load_config("config.json");// 初始化连接池auto source1 = create_connection(config.instances["source1"]);// 类似初始化其他连接// 创建日志器Logger logger(config.log_directory);// 处理每个数据库映射for (const auto& mapping : config.database_mappings) {// 获取源和目标连接auto source = get_connection(mapping.source_instance);auto target = get_connection("target");// 处理每个schemafor (const auto& schema : mapping.schemas) {replicate_data(source, target, schema, logger);}}return 0;
}
实现要点说明:
- 性能优化:
- 使用PostgreSQL的COPY命令进行批量数据传输
- 采用连接池管理数据库连接
- 多线程处理不同schema的复制任务(需要添加线程池)
- 错误处理:
- 三级重试机制(配置可调)
- 事务回滚保证数据一致性
- 详细的错误日志记录
- 日志管理:
- 每天自动滚动日志文件
- 记录毫秒级时间戳
- 包含完整的状态信息
- 线程安全的日志写入
- 扩展性:
- 模块化设计方便扩展新的数据库实例
- 配置驱动,无需重新编译即可修改参数
- 支持Schema级别的并行复制
编译和运行:
# 编译命令
g++ -std=c++17 -o pg_replicator main.cpp database.cpp logger.cpp -lpqxx -lpq -lfmt# 运行命令
./pg_replicator --config config.json
建议的扩展改进方向:
- 增加以下监控功能:
- 实时进度显示
- Prometheus指标导出
- 电子邮件/短信报警
- 性能优化:
- 并行复制不同表
- 流水线传输
- 压缩传输
- 数据一致性保障:
- 校验和验证
- 断点续传
- 双向同步检测
- 安全性增强:
- 加密连接支持
- 凭据安全存储
- 审计日志
这个设计在保证功能完整性的同时,充分考虑了大规模数据复制的性能需求,并通过模块化设计保证了可维护性。异常处理和日志系统为生产环境运行提供了必要保障。
相关文章:
Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数…...

Linux下的网络通信编程
在不同主机之间,进行进程间的通信。 1解决主机之间硬件的互通 2.解决主机之间软件的互通. 3.IP地址:来区分不同的主机(软件地址) 4.MAC地址:硬件地址 5.端口号:区分同一主机上的不同应用进程 网络协议…...

Windows在多网络下指定上网接口
Windows在多网络下指定上网接口 一、说明 设备情况:win11,同时连接了有线网和WLAN,有线网连接着NAS必须保持连接。需求:有些情况时,有线网无网络而WLAN有网,但系统仍走着有线导致无法上网。 二、方法 过…...
网络安全员证书
软考网络安全员证书:信息安全领域的黄金标准 随着信息技术的飞速发展,网络安全问题日益凸显,网络安全员的需求也日益增加。软考网络安全员证书作为信息安全领域的黄金标准,对于网络安全从业者来说具有重要意义。本文将详细介绍…...

CMU15445(2023fall) Project #4 - Concurrency Control踩坑历程
把树木磨成月亮最亮时的样子, 就能让它更快地滚下山坡, 有时会比骑马还快。 完整代码见: SnowLegend-star/CMU15445-2023fall: Having Conquered the Loftiest Peak, We Stand But a Step Away from Victory in This Stage. With unwavering…...

医疗AR眼镜:FPC如何赋能科技医疗的未来之眼?【新立电子】
随着科技的飞速发展,增强现实(AR)技术在医疗领域的应用逐渐成为焦点。医疗AR眼镜作为一种前沿的智能设备,正在为医疗行业带来深刻的变革。它不仅能够提升医生的工作效率,还能改善患者的就医体验,成为医疗科…...

Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比
前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…...
服务器迁移记录【腾讯云-->阿里云】
准备工作 压缩/root /usr/local/nginx /data三个目录到zip,并下载到本地。 zip root.zip /root zip nginx.zip /usr/local/nginx zip data.zip /datasz root.zip sz nginx.zip sz data.zip连接mysql数据库,导出数据库结构与数据到dzs_mysql.sql 安装l…...

序列化选型:字节流抑或字符串
序列化既可以将对象转换为字节流,也可以转换为字符串,具体取决于使用的序列化方式和场景。 转换为字节流 常见工具及原理:在许多编程语言中,都有将对象序列化为字节流的机制。例如 Python 中的 pickle 模块、Java 中的对象序列化…...

面向实时性的超轻量级动态感知视觉SLAM系统
一、重构后的技术架构设计(基于ROS1 ORB-SLAM2增强) #mermaid-svg-JEJte8kZd7qlnq3E {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JEJte8kZd7qlnq3E .error-icon{fill:#552222;}#mermaid-svg-JEJte8kZd7qlnq3E .…...
4-3自定义加载器,并添加功能
一、自定义类加载器的实现步骤 继承ClassLoader类 自定义类加载器需继承java.lang.ClassLoader,并选择性地重写以下方法: findClass(String name):核心方法,用于根据类名查找并加载类的字节码。需从自定义路径(…...
Python Scrapy爬虫面试题及参考答案
目录 简述 Scrapy 框架的基本工作流程,并说明各组件的作用 Scrapy 中的 Spider、CrawlSpider 和 Rule 的作用及区别? 如何通过 Scrapy Shell 快速调试页面解析逻辑? Scrapy 的 start_requests 方法与 start_urls 的关系是什么? 解释 Scrapy 的 Request 和 Response 对象…...
Swan 表达式 - 选择表达式
ANSYS Swan 表达式支持选择(selection)表达式 case, if/then/else。选择表达式根据特定的条件选择不同的分支流。 if/then/else 表达式 if/then/else 表达式的文法如下 if expr then expr else expr 其中,首个expr 的布尔表达式,若其为 true, 则返回 …...

微信小程序:完善购物车功能,购物车主页面展示,详细页面展示效果
一、效果图 1、主页面 根据物品信息进行菜单分类,点击单项购物车图标添加至购物车,记录总购物车数量 2、购物车详情页 根据主页面选择的项,根据后台查询展示到页面,可进行多选,数量加减等 二、代码 1、主页面 页…...
javaweb将上传的图片保存在项目文件webapp下的upload文件夹下
前端HTML表单 (upload.html) 首先,创建一个HTML页面,允许用户选择并上传图片。 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>图片上传</title> </head> <…...

LabVIEW 无法播放 AVI 视频的编解码器解决方案
用户在 LabVIEW 中使用示例程序 Read AVI File.vi(路径: 📌 C:\Program Files (x86)\National Instruments\LabVIEW 2019\examples\Vision\Files\Read AVI File.vi)时发现: ✅ LabVIEW 自带的 AVI 视频可正常播放 这是…...

composer 错误汇总
文章目录 1: 安装EasyWeChat 报错2: composer install 报错, laravel/framework[v11.9.0, ..., v11.44.0] require fruitcake/php-cors ^1.33: 卸载Pulse 报错, Class "Laravel\Pulse\Pulse" not found4: 卸载Telescope报错 1: 安装EasyWeChat 报错 解决: composer …...
MySQL锁分类
一、按锁的粒度划分 全局锁 定义:锁定整个数据库实例,阻止所有写操作,确保数据备份一致性。加锁方式:通过FLUSH TABLES WITH READ LOCK实现,释放需执行UNLOCK TABLES。应用场景:适用于全库逻辑备份…...

DeepSeek 助力 Vue3 开发:打造丝滑的悬浮按钮(Floating Action Button)
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 Deep…...
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构 一、信息熵与生命系统的耗散结构 在热力学第二定律框架下,生命系统可视为负熵流的耗散结构: d S d i S d e S dS d_iS d_eS dSdiSdeS 其中 d i S d_iS diS为内部熵…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...

Java数组Arrays操作全攻略
Arrays类的概述 Java中的Arrays类位于java.util包中,提供了一系列静态方法用于操作数组(如排序、搜索、填充、比较等)。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序(sort) 对数组进行升序…...

数据结构:泰勒展开式:霍纳法则(Horner‘s Rule)
目录 🔍 若用递归计算每一项,会发生什么? Horners Rule(霍纳法则) 第一步:我们从最原始的泰勒公式出发 第二步:从形式上重新观察展开式 🌟 第三步:引出霍纳法则&…...