不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)
场景
最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个时间内有18条数据需要首先进SQL server数据库,再更新到MySQL数据库中,这种场景如果每分钟都能将18条数据放入SQL server数据库的话就非常简单了,但是在15:08的时候,这18条数据可能只来11条,剩下的7条可能在15:09或后面的时间陆续过来。我开始的想法是通过最后更新的时间的时间戳来查询新来的数据然后更新到MySQL中,但是由于在最终的时间内还会来前面时间的数据,这样会导致前面时间的数据丢失,所以我想了另外一方法。
- 首先使用python写一个程序来同步SQL sever的历史数据到MySQL数据库中
- 在SQL server中创建一个中间表。
- 在SQL server中要传输的表中创建一个触发器,当这个表更新数据则触发将更新的数据放入到中间表中
- 在python脚本中写一个循环来定期检查中间表,我的SQL server表中由两个主键定义一条数据,所以中间表也是由两个字段定义一条数据,由于入库历史数据的数据量非常大,有几十万条,在这个入库历史数据的时间段内更新了很多条数据,所以可能中间表的数据与入库到MySQL中的字段有重复,所以我需要先验证中间表中的数据MySQL是否存在。
- 存在则删除中间表中这条数据
- 不存在则插入MySQL后删除这条数据
- 最后完成了入库程序,经过验证没有数据丢失
1.历史数据入库
历史数据入库我使用的python写的,首先定义两个数据库的信息
# 使用示例
sql_server_conn_params = {'driver': '{SQL Server}','server': 'ip','database': '数据库名','uid': 'jzyg','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '数据库名','charset': 'utf8mb4'
}
定义查询语句
querySolar = 'SELECT dtime,stationID,staionName,electric,tiltSolar,levelSolar,scatterSolar,directSolar,tiltSolar_day,levelSolar_day,scatterSolar_day,directSolar_day,sunShine_day FROM realData_Solar'
定义入库历史数据函数
def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name = row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
2.创建中间表和触发器
创建中间表
CREATE TABLE intermediateData_Wind AS SELECT * FROM realData_Wind WHERE 1=0;
创建触发器
CREATE TRIGGER CopyToIntermediateTable
ON realData_Wind
AFTER INSERT
AS
BEGIN-- 插入操作INSERT INTO intermediateData_Wind (dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min)SELECT dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10minFROM inserted;
END;
3.创建轮询中间表代码
def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")
4.总体代码
import threading
import time
import pyodbc
import pymysql
class DataTransfer:def __init__(self, sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData, interval=1):self.sql_server_conn_params = sql_server_conn_paramsself.mysql_conn_params = mysql_conn_paramsself.queryWind = queryWindself.queryIntermediateData = queryIntermediateDataself.interval = intervalself.wind_last_dtime = '1970-01-01 00:00:00' # 初始时间def clear_mysql_tables(self):"""清空 MySQL 中的指定表格数据"""try:with pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as cursor:# 清空 wind_monitor 表cursor.execute("TRUNCATE TABLE wind_monitor")mysql_conn.commit()print("已清空 wind_monitor 表的数据。")except Exception as e:print(f"清空表格时发生错误: {e}")def transfer_data(self):self.transfer_wind_data()while True:try:self.transfer_insert_intermediateData_Wind()except Exception as e:print(f"发生错误: {e}")# 等待一定时间再次传输数据time.sleep(self.interval)def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name = row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')def start(self):# 在启动线程前先清空表格self.clear_mysql_tables()thread = threading.Thread(target=self.transfer_data)thread.start()sql_server_conn_params = {'driver': '{SQL Server}','server': '','database': '','uid': '','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '','charset': 'utf8mb4'
}
queryIntermediateData = "SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM intermediateData_Wind"
queryWind = 'SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM realData_Wind'
data_transfer = DataTransfer(sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData)
data_transfer.start()
相关文章:
不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)
场景 最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个…...

国内的几款强大的AI智能—AI语言模型
R5Ai智能助手是一款由百度研发的文心一言,它支持gpt4 / gpt-3.5 / claude,也支持AI绘画,每天提供十次免费使用机会,无需魔法。该智能助手具有以下优点:会画画,没有使用次数限制,可以在界面上找到…...
linux下恶意软件的七种反分析技术
7 类主流的 Linux 恶意软件反分析/检测躲避技术 反调试(Anti-Debug): 软件调试是恶意软件分析的常⽤⼿段之⼀,但恶意软件可以通过识别调试器特征,实现⾃⾝恶意⾏为的隐藏,或导致调试失败,从⽽规避分析与检测…...
Spring Security OAuth2 认证服务器自定义异常处理
目录 前言WebResponseExceptionTranslator自定义异常处理1、自定义我们响应实体类2、定义响应结果枚举类3、自定义异常转换类4、配置自定义异常转换器5、测试 前言 Spring Security OAuth2 认证失败的格式如下 {"error": "unsupported_grant_type","…...
selenium环境安装
一、下载安装python 下载python安装python设置python环境变量安装selenium (1)下载python 您可以从Python官方网站(https://www.python.org/downloads/)下载Python。在页面上,您将看到不同版本的Python供您选择。根…...

(C++)和为s的两个数字--双指针算法
个人主页:Lei宝啊 愿所有美好如期而遇 和为S的两个数字_牛客题霸_牛客网输入一个升序数组 array 和一个数字S,在数组中查找两个数,使得他们的和正好是S,如果。题目来自【牛客题霸】https://www.nowcoder.com/practice/390da4f7a…...
鸿蒙(HarmonyOS)应用开发——构建页面(题目答案)
判断题 1.在Column容器中的子组件默认是按照从上到下的垂直方向布局的,其主轴的方向是垂直方向,在Row容器中的组件默认是按照从左到右的水平方向布局的,其主轴的方向是水平方向。 正确(True) 2.List容器可以沿水平方向排列,也可…...

Python基础快速过一遍
文章目录 一、变量及基本概念1、变量2、变量类型3、变量格式化输出4、type()函数5、input()函数6、类型转换函数7、注释 二、Python运算/字符1、算数运算2、比较运算3、逻辑运算4、赋值运算符5、转义字符6、成员运算符 三、判断/循环语句1、if判断语句2、while循环语句3、for循…...

等保测评报价相差很大,里面有什么门道
等保测评报价的差异主要源于以下几点: 服务质量评估标准不同:不同的测评机构在测评过程中所提供的服务范围、深度、细节等方面可能存在差异,因此导致报价有所不同。一些机构可能提供全面且细致的测评服务,致力于提供高质量的等保测…...

MATLAB的rvctools工具箱熟悉运动学【机械臂机器人示例】
1、rvctools下载安装 rvctools下载地址:rvctools下载 截图如下,点击红色箭头指示的“Download Shared Folder” 即可下载 下载之后进行解压,解压到D:\MATLAB\toolbox这个工具箱目录,这个安装路径根据自己的情况来选择,…...

如何精准操作无人机自动停机坪?
无人机自动停机坪通过自主导航和避障功能,实现了无人机的自主降落和起飞,在无人机技术领域起到了至关重要的作用。停机坪不仅仅是无人机的起降平台,还具备自动换电或充电等功能,为无人机的自动化提供了关键支持。为更有效地操作无…...

【蓝桥杯】带分数
带分数 题目要求用一个ab/c的形式得到一个值,而且只能在1~9里面不重复的组合。 可以对1~9进行全排列,然后不断划分区间。 #include<iostream> #include<vector> using namespace std; int st[15]; int num[15]; int res; int n;int calc(i…...
软件工程 课堂测验 选择填空
系统流程图用图形符号表示系统中各个元素,表达了系统中各个元素之间的 信息流动 喷泉模型是一种以用户需求为动力,以 对象 为驱动的模型。 软件生存周期中最长的是 维护 阶段。 变换流的DFD由三部分组成,不属于其中一部分的是 事务中心 软…...

计算机网络的分类
目录 一、按照传输介质进行分类 1、有线网络 2、无线网络 二、按照使用者进行分类 1、公用网 (public network) 2、专用网(private network) 三、按照网络规模和作用范围进行分类 1、PAN 个人局域网 2、LAN 局域网 3、MAN 城域网 4、 WAN 广域网 5、Internet 因特…...

百度收录批量查询工具,免费SEO优化排名工具
拥有一个在搜索引擎中得到良好收录的网站对于个人和企业都至关重要。而百度,作为中国最大的搜索引擎,其收录情况直接影响着网站的曝光度和流量。 百度搜索引擎是中文用户获取信息的重要途径之一。而在这个竞争激烈的网络环境中,了解自己网站…...

select选择框里填充图片,下拉选项带图片
遇到一个需求,选择下拉框选取图标,填充到框里 1、效果展示 2、代码 <el-form-item label"工种图标" class"Form_icon Form_label"><el-select ref"select" :value"formLabelAlign.icon" placeholder&…...

轨道交通数字孪生可视化平台,助力城市交通运营智慧化
随着经济和科技的快速发展,轨道交通运营管理在日常操作者面临各种挑战。数字孪生技术被认为是未来轨道交通运营管理的重要手段之一。它可以提高轨道交通的运营效率和安全性,助力城市交通运营智慧化。以城市轨道交通运维管理业务需求为导向,从数据感知、融…...

【每日OJ —— 101. 对称二叉树】
每日OJ —— 101. 对称二叉树 1.题目:101. 对称二叉树2.解法2.1.算法讲解2.2.代码实现2.3.提交通过展示 1.题目:101. 对称二叉树 2.解法 2.1.算法讲解 1.该题是判断二叉树是否对称,关键在于,左子树等于右子树,而所给的…...

善网商城上线洁柔产品 公益人专享爱心价官方正品
近日,中国善网慈善商城(以下简称善网商城)系统经升级后重新上线。目前善网商城线上销售的中顺洁柔旗下慈善产品已顺利获得中顺洁柔纸业股份有限公司授权,双方就合作事宜达成共识,并于近日签订线上经营授权书。 &#x…...

禁止谷歌浏览器自动更新
禁止谷歌浏览器自动更新 在使用Python包selenium的时候浏览器版版本发生变化后产生很多问题如: 1、直接版本不对应无法运行 2、版本不一致导致debug启动浏览器超级慢 这里是已谷歌浏览器为代表的。 禁止自动更新的方法如下: 1、WinR调出运行&#x…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...

力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...

Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...

云安全与网络安全:核心区别与协同作用解析
在数字化转型的浪潮中,云安全与网络安全作为信息安全的两大支柱,常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异,并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全:聚焦于保…...