Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C++程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数据到第三个实例中两个数据库中多个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。
根据需求,我设计了以下解决方案。由于代码较长,这里分为多个部分说明:
- 配置文件(config.json):
{"instances": {"source1": {"host": "192.168.1.100","port": 5432,"user": "user1","password": "pass1"},"source2": {"host": "192.168.1.101","port": 5432,"user": "user2","password": "pass2"},"target": {"host": "192.168.1.102","port": 5432,"user": "target_user","password": "target_pass"}},"database_mappings": [{"source_instance": "source1","source_db": "db1","schemas": ["schema1", "schema2"],"target_db": "target_db1"},{"source_instance": "source2","source_db": "db2","schemas": ["schema3", "schema4"],"target_db": "target_db2"}],"retry": {"max_attempts": 3,"interval_seconds": 60},"log_directory": "logs"
}
- 数据库操作类(database.h):
#include <pqxx/pqxx>
#include <string>
#include <memory>class DatabaseConnector {
public:DatabaseConnector(const std::string& host, int port, const std::string& dbname, const std::string& user,const std::string& password){std::string conn_str = fmt::format("host={} port={} dbname={} user={} password={}",host, port, dbname, user, password);conn_ = std::make_unique<pqxx::connection>(conn_str);}pqxx::result query(const std::string& sql) {pqxx::work txn(*conn_);auto result = txn.exec(sql);txn.commit();return result;}// COPY命令高效传输数据void copy_table(const std::string& source_table, const std::string& target_table,std::shared_ptr<DatabaseConnector> target_conn) {pqxx::work source_txn(*conn_);std::stringstream buffer;source_txn.conn().copy_to(source_table, buffer);source_txn.commit();pqxx::work target_txn(*target_conn->conn_);target_txn.conn().copy_from(buffer, target_table);target_txn.commit();}private:std::unique_ptr<pqxx::connection> conn_;
};
- 日志管理类(logger.h):
#include <fstream>
#include <chrono>
#include <iomanip>
#include <filesystem>class Logger {
public:Logger(const std::string& log_dir) {namespace fs = std::filesystem;fs::create_directories(log_dir);auto now = std::chrono::system_clock::now();auto today = std::chrono::floor<std::chrono::days>(now);log_file_ = log_dir + "/" + format_time(today, "%Y-%m-%d") + ".log";}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx_);std::ofstream file(log_file_, std::ios::app);file << "[" << format_time(std::chrono::system_clock::now()) << "] " << message << "\n";}private:std::string format_time(auto time_point, const char* fmt = "%T") {auto in_time_t = std::chrono::system_clock::to_time_t(time_point);std::stringstream ss;ss << std::put_time(std::localtime(&in_time_t), fmt);return ss.str();}std::string log_file_;std::mutex mtx_;
};
- 主程序逻辑(main.cpp):
#include <nlohmann/json.hpp>
#include <vector>
#include <thread>struct Config {// 解析配置的代码
};void replicate_data(std::shared_ptr<DatabaseConnector> source,std::shared_ptr<DatabaseConnector> target,const std::string& schema,Logger& logger) {// 获取所有表结构auto tables = source->query(fmt::format("SELECT table_name FROM information_schema.tables ""WHERE table_schema = '{}' AND table_type = 'BASE TABLE'", schema));for (const auto& row : tables) {std::string table_name = row[0].as<std::string>();std::string qualified_name = schema + "." + table_name;// 记录开始信息logger.log(fmt::format("START Table: {}", qualified_name));try {// 使用移动数据的高效方法source->copy_table(qualified_name, qualified_name, target);// 记录成功信息logger.log(fmt::format("SUCCESS Table: {}", qualified_name));} catch (const std::exception& e) {// 异常处理和重试逻辑logger.log(fmt::format("ERROR Table: {} - {}", qualified_name, e.what()));// 重试逻辑实现}}
}int main() {// 加载配置Config config = load_config("config.json");// 初始化连接池auto source1 = create_connection(config.instances["source1"]);// 类似初始化其他连接// 创建日志器Logger logger(config.log_directory);// 处理每个数据库映射for (const auto& mapping : config.database_mappings) {// 获取源和目标连接auto source = get_connection(mapping.source_instance);auto target = get_connection("target");// 处理每个schemafor (const auto& schema : mapping.schemas) {replicate_data(source, target, schema, logger);}}return 0;
}
实现要点说明:
- 性能优化:
- 使用PostgreSQL的COPY命令进行批量数据传输
- 采用连接池管理数据库连接
- 多线程处理不同schema的复制任务(需要添加线程池)
- 错误处理:
- 三级重试机制(配置可调)
- 事务回滚保证数据一致性
- 详细的错误日志记录
- 日志管理:
- 每天自动滚动日志文件
- 记录毫秒级时间戳
- 包含完整的状态信息
- 线程安全的日志写入
- 扩展性:
- 模块化设计方便扩展新的数据库实例
- 配置驱动,无需重新编译即可修改参数
- 支持Schema级别的并行复制
编译和运行:
# 编译命令
g++ -std=c++17 -o pg_replicator main.cpp database.cpp logger.cpp -lpqxx -lpq -lfmt# 运行命令
./pg_replicator --config config.json
建议的扩展改进方向:
- 增加以下监控功能:
- 实时进度显示
- Prometheus指标导出
- 电子邮件/短信报警
- 性能优化:
- 并行复制不同表
- 流水线传输
- 压缩传输
- 数据一致性保障:
- 校验和验证
- 断点续传
- 双向同步检测
- 安全性增强:
- 加密连接支持
- 凭据安全存储
- 审计日志
这个设计在保证功能完整性的同时,充分考虑了大规模数据复制的性能需求,并通过模块化设计保证了可维护性。异常处理和日志系统为生产环境运行提供了必要保障。
相关文章:
Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
设计一个在Linux上运行的GCC C程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数…...
Linux下的网络通信编程
在不同主机之间,进行进程间的通信。 1解决主机之间硬件的互通 2.解决主机之间软件的互通. 3.IP地址:来区分不同的主机(软件地址) 4.MAC地址:硬件地址 5.端口号:区分同一主机上的不同应用进程 网络协议…...
Windows在多网络下指定上网接口
Windows在多网络下指定上网接口 一、说明 设备情况:win11,同时连接了有线网和WLAN,有线网连接着NAS必须保持连接。需求:有些情况时,有线网无网络而WLAN有网,但系统仍走着有线导致无法上网。 二、方法 过…...
网络安全员证书
软考网络安全员证书:信息安全领域的黄金标准 随着信息技术的飞速发展,网络安全问题日益凸显,网络安全员的需求也日益增加。软考网络安全员证书作为信息安全领域的黄金标准,对于网络安全从业者来说具有重要意义。本文将详细介绍…...
CMU15445(2023fall) Project #4 - Concurrency Control踩坑历程
把树木磨成月亮最亮时的样子, 就能让它更快地滚下山坡, 有时会比骑马还快。 完整代码见: SnowLegend-star/CMU15445-2023fall: Having Conquered the Loftiest Peak, We Stand But a Step Away from Victory in This Stage. With unwavering…...
医疗AR眼镜:FPC如何赋能科技医疗的未来之眼?【新立电子】
随着科技的飞速发展,增强现实(AR)技术在医疗领域的应用逐渐成为焦点。医疗AR眼镜作为一种前沿的智能设备,正在为医疗行业带来深刻的变革。它不仅能够提升医生的工作效率,还能改善患者的就医体验,成为医疗科…...
Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比
前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…...
服务器迁移记录【腾讯云-->阿里云】
准备工作 压缩/root /usr/local/nginx /data三个目录到zip,并下载到本地。 zip root.zip /root zip nginx.zip /usr/local/nginx zip data.zip /datasz root.zip sz nginx.zip sz data.zip连接mysql数据库,导出数据库结构与数据到dzs_mysql.sql 安装l…...
序列化选型:字节流抑或字符串
序列化既可以将对象转换为字节流,也可以转换为字符串,具体取决于使用的序列化方式和场景。 转换为字节流 常见工具及原理:在许多编程语言中,都有将对象序列化为字节流的机制。例如 Python 中的 pickle 模块、Java 中的对象序列化…...
面向实时性的超轻量级动态感知视觉SLAM系统
一、重构后的技术架构设计(基于ROS1 ORB-SLAM2增强) #mermaid-svg-JEJte8kZd7qlnq3E {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JEJte8kZd7qlnq3E .error-icon{fill:#552222;}#mermaid-svg-JEJte8kZd7qlnq3E .…...
4-3自定义加载器,并添加功能
一、自定义类加载器的实现步骤 继承ClassLoader类 自定义类加载器需继承java.lang.ClassLoader,并选择性地重写以下方法: findClass(String name):核心方法,用于根据类名查找并加载类的字节码。需从自定义路径(…...
Python Scrapy爬虫面试题及参考答案
目录 简述 Scrapy 框架的基本工作流程,并说明各组件的作用 Scrapy 中的 Spider、CrawlSpider 和 Rule 的作用及区别? 如何通过 Scrapy Shell 快速调试页面解析逻辑? Scrapy 的 start_requests 方法与 start_urls 的关系是什么? 解释 Scrapy 的 Request 和 Response 对象…...
Swan 表达式 - 选择表达式
ANSYS Swan 表达式支持选择(selection)表达式 case, if/then/else。选择表达式根据特定的条件选择不同的分支流。 if/then/else 表达式 if/then/else 表达式的文法如下 if expr then expr else expr 其中,首个expr 的布尔表达式,若其为 true, 则返回 …...
微信小程序:完善购物车功能,购物车主页面展示,详细页面展示效果
一、效果图 1、主页面 根据物品信息进行菜单分类,点击单项购物车图标添加至购物车,记录总购物车数量 2、购物车详情页 根据主页面选择的项,根据后台查询展示到页面,可进行多选,数量加减等 二、代码 1、主页面 页…...
javaweb将上传的图片保存在项目文件webapp下的upload文件夹下
前端HTML表单 (upload.html) 首先,创建一个HTML页面,允许用户选择并上传图片。 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>图片上传</title> </head> <…...
LabVIEW 无法播放 AVI 视频的编解码器解决方案
用户在 LabVIEW 中使用示例程序 Read AVI File.vi(路径: 📌 C:\Program Files (x86)\National Instruments\LabVIEW 2019\examples\Vision\Files\Read AVI File.vi)时发现: ✅ LabVIEW 自带的 AVI 视频可正常播放 这是…...
composer 错误汇总
文章目录 1: 安装EasyWeChat 报错2: composer install 报错, laravel/framework[v11.9.0, ..., v11.44.0] require fruitcake/php-cors ^1.33: 卸载Pulse 报错, Class "Laravel\Pulse\Pulse" not found4: 卸载Telescope报错 1: 安装EasyWeChat 报错 解决: composer …...
MySQL锁分类
一、按锁的粒度划分 全局锁 定义:锁定整个数据库实例,阻止所有写操作,确保数据备份一致性。加锁方式:通过FLUSH TABLES WITH READ LOCK实现,释放需执行UNLOCK TABLES。应用场景:适用于全库逻辑备份…...
DeepSeek 助力 Vue3 开发:打造丝滑的悬浮按钮(Floating Action Button)
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 Deep…...
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构
认知动力学视角下的生命优化系统:多模态机器学习框架的哲学重构 一、信息熵与生命系统的耗散结构 在热力学第二定律框架下,生命系统可视为负熵流的耗散结构: d S d i S d e S dS d_iS d_eS dSdiSdeS 其中 d i S d_iS diS为内部熵…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
大语言模型如何处理长文本?常用文本分割技术详解
为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
NPOI操作EXCEL文件 ——CAD C# 二次开发
缺点:dll.版本容易加载错误。CAD加载插件时,没有加载所有类库。插件运行过程中用到某个类库,会从CAD的安装目录找,找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库,就用插件程序加载进…...
Kafka主题运维全指南:从基础配置到故障处理
#作者:张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1:主题删除失败。常见错误2:__consumer_offsets占用太多的磁盘。 主题日常管理 …...
LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)
在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...
Unity VR/MR开发-VR开发与传统3D开发的差异
视频讲解链接:【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...
