利用go-migrate实现MySQL和ClickHouse的数据库迁移
1. 背景
在使用gorm时 , 尽管已经有了自动建表和钩子函数 . 但是在面临希望了解到数据库的变更 , 和插入一些系统字段时 , 以及最关键的数据库迁移的工作 . gorm显得稍微有点不便 .
在了解到migrate这项技术后 , 就使用go-migrate开发了一个可以迁移MySQL和ClickHouse数据库的工具.
2. 实现
2.1 简单介绍
go-migrate在启动后 , 会在数据库中自动生成一张 "schema_migrations"表 , 这张表在mysql和clickhouse中的结构有一定区别.但是主要的字段是相同的.
clickhouse
"version": 表示版本号
"dirty": 表示执行成功或失败 0:成功 1:失败
2.2 具体实现
先新建一个目录 , 结构可以自己去梳理:
mysql文件夹中存放的是 mysql数据库相关变迁的sql语句
clickhouse文件夹存放的是clickhouse数据库相关变迁的sql语句
migrate.txt只是为了开发人员更好了解到当前执行到什么版本了
所有的sql文件前需要一个版本号,保证是唯一的.
.up: 表示的是需要执行的sql
如果希望自动回滚 , 可以在每一个版本的sql文件后 , 在新建一个sql文件. 且 .up 替换为 .down. 即可自动回滚.
# migrate.txt
1_waf_top_mysql_create_app_waf_table.up.sql
2_waf_top_mysql_create_server_waf_table.up.sql
3_waf_top_mysql_create_waf_allow_list_table.up.sql
4_waf_top_mysql_create_waf_buildin_rule_table.up.sql
5_waf_top_mysql_create_waf_rule_group_table.up.sql
6_waf_top_mysql_create_waf_server_allow_table.up.sql
7_waf_top_mysql_create_waf_servers_strategies_table.up.sql
8_waf_top_mysql_create_waf_strategy_table.up.sql
9_waf_top_mysql_create_waf_strategy_config_table.up.sql
10_waf_top_mysql_create_waf_user_rule_table.up.sql
11_waf_user_mysql_create_waf_user_info_table.up.sql
12_dash_borad_ck_create_sec_log_table.up.sql
13_waf_top_mysql_insert_buildin_rule_data.up.sql
14_waf_top_mysql_insert_waf_rule_group_data.up.sql
15_waf_top_mysql_alter_server_waf_desc.up.sql
前缀数字表示的就是版本号
package migrateimport ("context""database/sql""errors""fmt""path/filepath""time"_ "github.com/ClickHouse/clickhouse-go/v2""github.com/go-redis/redis/v8""github.com/golang-migrate/migrate/v4""github.com/golang-migrate/migrate/v4/database"chMigrate "github.com/golang-migrate/migrate/v4/database/clickhouse"mysqlMigrate "github.com/golang-migrate/migrate/v4/database/mysql"_ "github.com/golang-migrate/migrate/v4/source/file""github.com/google/uuid""github.com/sirupsen/logrus""wafconsole/utils/redislock"_ "github.com/go-sql-driver/mysql"
)// Config 迁移配置
type Config struct {AppName stringMySqlDSN stringClickHouseDSN stringRedisAddr stringRedisPassword stringRedisDB intMigrationDir string // 指向 migrations 父目录(包含 mysql 和 clickhouse 子目录)LockTimeout time.DurationTargetVersion uint
}// DatabaseMigrator 数据库迁移器
type DatabaseMigrator struct {mysqlDB *sql.DBclickhouseDB *sql.DBredisClient *redis.Clientconfig *ConfiglockID string
}// NewDatabaseMigrator 创建新实例
func NewDatabaseMigrator(cfg *Config) (*DatabaseMigrator, error) {// 初始化MySQL连接mysqlDB, err := sql.Open("mysql", cfg.MySqlDSN)if err != nil {return nil, fmt.Errorf("failed to connect to MySQL: %w", err)}// 验证MySQL连接if err = mysqlDB.Ping(); err != nil {return nil, fmt.Errorf("MySQL ping failed: %w", err)}// 初始化ClickHouse连接clickhouseDB, err := sql.Open("clickhouse", cfg.ClickHouseDSN)if err != nil {return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)}// 验证ClickHouse连接if err = clickhouseDB.Ping(); err != nil {return nil, fmt.Errorf("ClickHouse ping failed: %w", err)}// 初始化Redis客户端rdb := redis.NewClient(&redis.Options{Addr: cfg.RedisAddr,Password: cfg.RedisPassword,DB: cfg.RedisDB,})// 验证Redis连接if err = rdb.Ping(context.Background()).Err(); err != nil {return nil, fmt.Errorf("redis connection failed: %w", err)}return &DatabaseMigrator{mysqlDB: mysqlDB,clickhouseDB: clickhouseDB,redisClient: rdb,config: cfg,lockID: uuid.New().String(),}, nil
}// Run 执行全量迁移
func (m *DatabaseMigrator) Run(ctx context.Context) error {lockKey := "database_migration_lock"rdLock := redislock.NewRedisLock(m.redisClient, m.config.LockTimeout)if err := rdLock.AcquireLock(ctx, lockKey); err != nil {return fmt.Errorf("failed to acquire lock: %w", err)}defer func() {if err := rdLock.ReleaseLock(ctx, lockKey); err != nil {logrus.Errorf("Failed to release lock: %v", err)}}()if err := m.migrateMySQL(ctx); err != nil {return fmt.Errorf("MySQL migration failed: %w", err)}if err := m.migrateClickHouse(ctx); err != nil {return fmt.Errorf("ClickHouse migration failed: %w", err)}return nil
}// MySQL 迁移
func (m *DatabaseMigrator) migrateMySQL(ctx context.Context) error {driver, err := mysqlMigrate.WithInstance(m.mysqlDB, &mysqlMigrate.Config{})if err != nil {return fmt.Errorf("failed to create MySQL driver: %w", err)}return m.runMigration(ctx, driver, "mysql")
}// ClickHouse 迁移
func (m *DatabaseMigrator) migrateClickHouse(ctx context.Context) error {driver, err := chMigrate.WithInstance(m.clickhouseDB, &chMigrate.Config{})if err != nil {return fmt.Errorf("failed to create ClickHouse driver: %w", err)}return m.runMigration(ctx, driver, "clickhouse")
}// 通用迁移逻辑
func (m *DatabaseMigrator) runMigration(ctx context.Context,driver database.Driver,dbType string,
) error {// 获取原始路径migratePath := filepath.Join(m.config.MigrationDir, dbType)// 强制转换为 URL 兼容的斜杠格式migratePath = filepath.ToSlash(migratePath)// 构建 URLsourceURL := fmt.Sprintf("file://%s", migratePath)// 初始化迁移实例migrator, err := migrate.NewWithDatabaseInstance(sourceURL, dbType, driver)if err != nil {return fmt.Errorf("failed to initialize migrator: %w", err)}defer migrator.Close()// 执行迁移var migrationErr errorif m.config.TargetVersion > 0 {migrationErr = migrator.Migrate(m.config.TargetVersion)} else {migrationErr = migrator.Up()}// 处理迁移结果if migrationErr != nil && !errors.Is(migrationErr, migrate.ErrNoChange) {return fmt.Errorf("migration failed: %w", migrationErr)}logrus.Infof("%s migration completed successfully", dbType)return nil
}// Close 关闭资源(保持不变)
func (m *DatabaseMigrator) Close() error {var errs []errorif err := m.mysqlDB.Close(); err != nil {errs = append(errs, fmt.Errorf("MySQL close error: %w", err))}if err := m.clickhouseDB.Close(); err != nil {errs = append(errs, fmt.Errorf("ClickHouse close error: %w", err))}if err := m.redisClient.Close(); err != nil {errs = append(errs, fmt.Errorf("Redis close error: %w", err))}if len(errs) > 0 {return fmt.Errorf("errors occurred during shutdown: %v", errs)}return nil
}
2.3 优化方式
2.3.1 脏版本处理
1. 在执行过程中 , 可能会出现一些因sql语句错误而执行失败 . migrate实现了清洗脏版本的功能.加在通用迁移逻辑 初始化迁移实列后即可.
// 检查是否为脏版本version, dirty, err := migrator.Version()if err != nil && !errors.Is(err, migrate.ErrNilVersion) {return fmt.Errorf("failed to check version: %w", err)}if dirty {// 强制清除脏状态if err = migrator.Force(int(version)); err != nil {return fmt.Errorf("failed to force clean version: %w", err)}}
2. 或者手动修改 表中的版本号 , 修改到上一个版本(即sql文件最开始的数字) , 且状态改为0. 因为当migrate检测到为1执行失败后 , 就不在继续执行了.
2.3.2 分布式锁防止并发情况下 , 同时执行多个迁移操作
lockKey := "database_migration_lock"rdLock := redislock.NewRedisLock(m.redisClient, m.config.LockTimeout)if err := rdLock.AcquireLock(ctx, lockKey); err != nil {return fmt.Errorf("failed to acquire lock: %w", err)}defer func() {if err := rdLock.ReleaseLock(ctx, lockKey); err != nil {logrus.Errorf("Failed to release lock: %v", err)}}()
上面具体实现中已经包含了这段代码 , 是我自己封装的一个redis分布式锁的实现. 这段代码 , 如果不需要可以删除 , 如有需要 , 可以自己实现一个简单的redis分布式锁即可.
相关文章:

利用go-migrate实现MySQL和ClickHouse的数据库迁移
1. 背景 在使用gorm时 , 尽管已经有了自动建表和钩子函数 . 但是在面临希望了解到数据库的变更 , 和插入一些系统字段时 , 以及最关键的数据库迁移的工作 . gorm显得稍微有点不便 . 在了解到migrate这项技术后 , 就使用go-migrate开发了一个可以迁移MySQL和ClickHouse数据库的…...

计算机毕业设计SpringBoot+Vue.js企业客户管理系统(源码+LW文档+PPT+讲解+开题报告)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
jmeter 如何做移动端的测试 特别是兼容性测试
JMeter本身主要是一款用于性能测试和功能测试的工具,虽然它并非专门为移动端测试设计,但可以通过一些方式来对移动端应用进行测试,以下从测试准备、测试过程及注意事项等方面为你详细介绍: 一、测试准备 (一)环境搭建 JMeter安装与配置:确保JMeter已经正确安装在测试机…...

深度学习技术全景图:从基础架构到工业落地的超级进化指南
🔍 目录导航 基础架构革命训练优化秘技未来战场前瞻 🧩 一、基础架构革命 1.1 前馈神经网络(FNN) ▍核心结构 import torch.nn as nnclass FNN(nn.Module):def __init__(self):super().__init__()self.fc1 nn.Linear(784, 25…...

vllm部署LLM(qwen2.5,llama,deepseek)
目录 环境 qwen2.5-1.5b-instruct 模型下载 vllm 安装 验证安装 vllm 启动 查看当前模型列表 OpenAI Completions API(文本生成) OpenAI Chat Completions API(chat 对话) vllm 进程查看,kill llama3 deep…...

基于SpringBoot的“古城景区管理系统”的设计与实现(源码+数据库+文档+PPT)
基于SpringBoot的“古城景区管理系统”的设计与实现(源码数据库文档PPT) 开发语言:Java 数据库:MySQL 技术:SpringBoot 工具:IDEA/Ecilpse、Navicat、Maven 系统展示 系统整体功能图 系统首页界面 系统注册界面 景…...
如何防止 Docker 注入了恶意脚本
根据您的描述,攻击者通过 CentOS 7 系统中的 Docker 注入了恶意脚本,导致自动启动名为 “masscan” 和 “x86botnigletjsw” 的进程。这些进程可能用于网络扫描或其他恶意活动。为了解决这一问题,建议您采取以下步骤: 1. 停止并删…...

使用python接入腾讯云DeepSeek
本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。 参考文档: 腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380 fastAPI官网:https://fastapi.tiangolo.com/ WebSocketManager…...

【MySQL】服务正在启动或停止中,请稍候片刻后再试一次【解决方案】
问题呈现 在使用MySQL的过程中我们可能会遇到以上的情况 解决方法 首先以管理员身份打开命令行窗口,注意是管理员身份,不然无权限访问。输入命令tasklist| findstr "mysql",用于查找mysql的残留进程。这个时候我们就会看到一个…...
测试工程师玩转DeepSeek之Prompt
以下是测试工程师使用DeepSeek的必知必会提示词指南,分为核心场景和高效技巧两大维度: 一、基础操作提示模板 1. 测试用例生成 "作为[金融系统/物联网设备/云服务]测试专家,请为[具体功能模块]设计测试用例,要求࿱…...

【PyTorch】2024保姆级安装教程-Python-(CPU+GPU详细完整版)-
一、准备工作 pytorch需要python3.6及以上的python版本 我是利用Anaconda来管理我的python。可自行安装Anaconda。 Anaconda官网 Free Download | Anaconda 具体Anaconda安装教程可参考 https://blog.csdn.net/weixin_43412762/article/details/129599741?fromshareblogdet…...

精选案例展 | 智己汽车—全栈可观测驱动智能化运营与成本优化
本案例为“观测先锋 2024 可观测平台创新应用案例大赛”精选案例,同时荣获IT168“2024技术卓越奖评选-年度创新解决方案”奖。 项目背景 近年来,中国汽车行业进入转型升级阶段,智能网联技术成为行业发展的核心。车联网、自动驾驶等技术的加速…...
MySQL 使用 `WHERE` 子句时 `COUNT(*)`、`COUNT(1)` 和 `COUNT(column)` 的区别解析
文章目录 1. COUNT() 函数的基本作用2. COUNT(*)、COUNT(1) 和 COUNT(column) 的详细对比2.1 COUNT(*) —— 统计所有符合条件的行2.2 COUNT(1) —— 统计所有符合条件的行2.3 COUNT(column) —— 统计某一列非 NULL 的记录数 3. 性能对比3.1 EXPLAIN 分析 4. 哪种方式更好&…...
Linux运维——网络管理
Linux网络管理 一、Linux网络应用要点二、命令常见用法2.1、curl2.1.1、发送GET请求2.1.2、发送POST请求2.1.3、设置请求头2.1.4、处理cookies2.1.5、处理重定向2.1.6、调试和详细信息2.1.7、使用代理2.1.8、文件上传2.1.9、其它常用选项2.1.10、综合示例 2.2、wget2.2.1、基本…...

STM32CUBEIDE FreeRTOS操作教程(十三):task api 任务访问函数
STM32CUBEIDE FreeRTOS操作教程(十三):task api 任务访问函数 STM32CUBE开发环境集成了STM32 HAL库进行FreeRTOS配置和开发的组件,不需要用户自己进行FreeRTOS的移植。这里介绍最简化的用户操作类应用教程。以STM32F401RCT6开发板…...

Jmeter+Jenkins接口压力测试持续集成
项目介绍 接口功能测试应用: http://www.weather.com.cn/data/cityinfo/<city_code>.html 测试功能:获取对应城市的天气预报 请求方法:Get 压测脚本开发工具:jmeter 源码脚本位置: https://github.com/shife…...

深入浅出ES6:现代JavaScript的基石
ES6(ECMAScript 2015)是JavaScript语言的一次重大更新,引入了许多新特性,使JavaScript更加强大、优雅和易于维护。这些特性已经成为现代JavaScript开发的基石,掌握它们对于任何JavaScript开发者都至关重要。本文将深入…...
实现使用RBF(径向基函数)神经网络模拟二阶电机数学模型中的非线性干扰,以及使用WNN(小波神经网络)预测模型中的非线性函数来抵消迟滞影响的功能
下面将详细介绍如何实现使用RBF(径向基函数)神经网络模拟二阶电机数学模型中的非线性干扰,以及使用WNN(小波神经网络)预测模型中的非线性函数来抵消迟滞影响的功能。我们将按照以下步骤进行: 步骤1&#x…...

潜水泵,高效排水,守护城市与农田|深圳鼎跃
洪水是常见的自然灾害,在春夏季节的我国降水多为丰富,容易造成城市内部的洪涝灾害。特别是低洼地区的积水,不仅容易造成城市交通的出行不便,还存在潜在的隐患,严重影响了人们正常生活。 潜水泵作为一种高效、可靠的排水…...

易基因:RNA甲基化修饰和R-loop的交叉调控:从分子机制到临床意义|深度综述
大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 R-loop(RNA-DNA杂合结构)是转录调控、DNA复制和修复等关键细胞过程的重要组成部分。但R-loop异常积累可能会破坏基因组完整性,从而导致多种疾病的发生…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...

初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...

springboot 日志类切面,接口成功记录日志,失败不记录
springboot 日志类切面,接口成功记录日志,失败不记录 自定义一个注解方法 import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/***…...

Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...

GAN模式奔溃的探讨论文综述(一)
简介 简介:今天带来一篇关于GAN的,对于模式奔溃的一个探讨的一个问题,帮助大家更好的解决训练中遇到的一个难题。 论文题目:An in-depth review and analysis of mode collapse in GAN 期刊:Machine Learning 链接:...

如何把工业通信协议转换成http websocket
1.现状 工业通信协议多数工作在边缘设备上,比如:PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发,当设备上用的是modbus从站时,采集设备数据需要开发modbus主站;当设备上用的是西门子PN协议时…...
HTML中各种标签的作用
一、HTML文件主要标签结构及说明 1. <!DOCTYPE html> 作用:声明文档类型,告知浏览器这是 HTML5 文档。 必须:是。 2. <html lang“zh”>. </html> 作用:包裹整个网页内容,lang"z…...