Debezium 同步 MySQL 实时数据并解决数据重复消费问题
我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL,代码网上基本都有,都是在引入 debezium-api,debezium-embedded 后写 Java 代码,做好了基本配置后启动程序,Debezium 会自动读取 MySQL 的实时 binlog,然后触发相应的事件让我们处理,我们就把事件里的数据读取出来,插入到目标库即可。我们的 MySQL 的版本是 5.7 。
但我们在其中发现了一个很奇怪的问题,目标库存在多个相同的 sql ,我们以为是 Debezium 重复消费了 binlog 里的事件,就记录下每个事件的 position 并判重,但 sql 还是重复了,我们一开始觉得 MySQL 写的 binlog 肯定没问题,一个事务对应一个事件。之后我们使用 binlog2sql 这个 python 工具读取了已归档的 binlog 文件,发现里面没有重复的 sql ,这说明 MySQL binlog 还是没有问题的,问题在 Debezium,但 Debezium 作为一个成熟的 cdc 工具应该也不会有什么大的问题,可能是 Debezium 的配置问题,但检查了 Debezium 的所有配置后还是没发现有什么问题,配置改了后重新运行结果还是一样。 后面我们怀疑可能和 gtid 有关,我们发现 “Insert into xxx values (xxx) ” 会产生一个 binlog 事件,因为一个事务会产生一个 binlog 事件,但 “Insert into xxx values (xxx),(xxx),(xxx)...” 会产生多个事件,但这些事件的 gtid 还是同一个,事件里的 query 属性还是同一个,事件的 query 属性即原始 sql ,这就破案了,我们一直消费每个事件的query,但可能多个事件里的 query 属性是一样的,因为它们的 gtid 属性相同,它们属于同一个全局事务。后面我们使用 gtid 过滤相同属性就解决了数据重复问题。至于为什么一个批量插入会产生一个多个事件,并且多个事件的 gtid 是同一个,我们猜测 MySQL 的 binlog 就是这样写日志的,修改一行数据就产生一个事件,要是批量修改就产生多个事件,但这些批量事件同属于一个全局事务。
怎么过滤重复 gtid 问题?因为 gtid 是递增的,相同的 gtid 都会一起出现,所以可以使用自动老元素的 Map,或是设置键过期的 redis,或是 带有 gtid 属性的数据库表,并设置它是唯一索引,或是插入数据之前先检查数据库里是否有本事件的gtid,有就跳过,没有就插入,并把这个过程加锁保证原子性。
核心代码:
// 启动
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(config) .notifying(DataSync::handleChangeEvent).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);
private static void handleChangeEvent(ChangeEvent<String, String> event) {JSONObject valueJson = JSON.parseObject(event.value());if (valueJson != null) {JSONObject payload = valueJson.getJSONObject("payload");JSONObject source = payload.getJSONObject("after");// 原始sqlString query = source.getString("data_definition");// 对 sql 字符串进行美化query = query.replaceAll("[\\n\\r\\t\\s]+", " ");String database = source.getString("database");String table = source.getString("table_name");String gtid = source.getString("gtid");synchronized (lock) {// 查询数据库该 gtid 的数量long cnt = queryGtid(gtid);if (cnt == 0) {// 如果数据库不存在该 sql 就插入save(query, database, table, gtid);} else {System.out.println(gtid + " 有重复");}} }}
相关文章:
Debezium 同步 MySQL 实时数据并解决数据重复消费问题
我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL,代码网上基本都有,都是在引入 debezium-api,debezium-embedded 后写 Java 代码,做好了基本配置后启动程序,Debezium 会自动读取 MySQL 的实时 binlog&…...
【图像处理】1、使用OpenCV库图像轮廓的检测和绘制
OpenCV (Open Source Computer Vision Library) 是一个用于计算机视觉和图像处理的开源库。它提供了数百种用于图像和视频分析的算法,并被广泛应用于研究和商业领域。OpenCV 支持多种编程语言,包括 C、Python、Java 等,具有跨平台的特性&…...
【AI编译器】triton学习:矩阵乘优化
Matrix Multiplication 主要内容: 块级矩阵乘法 多维指针算术 重新编排程序以提升L2缓存命 自动性能调整 Motivations 矩阵乘法是当今高性能计算系统的一个关键组件,在大多数情况下被用于构建硬件。由于该操作特别复杂,因此通常由软件提…...
动静分离网络
动静分离网络的主要目的是分别处理视频帧中的静止区域和运动区域,以便对不同区域采用不同的去噪策略。这里提供一个实现思路,通过两个分支网络分别处理静止区域和运动区域,然后将两者的输出融合起来。 实现步骤 帧差图生成:计算…...
Python商务数据分析知识专栏(三)——Python数据分析的应用①Matplotlib数据可视化基础
Python商务数据分析知识专栏(三)——Python数据分析的应用①Matplotlib数据可视化基础 Matplotlib数据可视化基础1.掌握绘图基本语法与常用绘图2.分析特征间关系3.分析特征内部数据分布与分散情况 Matplotlib数据可视化基础 1.掌握绘图基本语法与常用绘…...
DataV大屏组件库
DataV官方文档 DataV组件库基于Vue (React版 (opens new window)) ,主要用于构建大屏(全屏)数据展示页面即数据可视化,具有多种类型组件可供使用: 源码下载...
paraview跨节点并行渲染
参考: https://cloud.tencent.com/developer/ask/sof/101483588 ParaView 支持使用其内置的网络拓扑来进行跨节点的并行渲染。以下是一个简单的步骤来设置和运行跨节点的并行渲染: 确保你的计算环境支持多节点计算,比如通过SSH、MPI或其他集…...
Java中相等比较详解
本文对Java中的相等判断进行详细解释,包括,equals和compareTo等。 一、 运算符 1. 用途 基本数据类型:用于比较两个基本数据类型的值是否相等。 引用类型:用于比较两个对象引用是否指向同一个对象。 2. 示例 // 基本数据类型比…...
HBuilder X 小白日记01
1.创建项目 2.右击项目,可创建html文件 3.保存CtrlS,运行一下 我们写的内容,一般是写在body里面 注释的快捷键:Ctrl/ h标签 <h1> 定义重要等级最高的(最大)的标题。<h6> 定义最小的标题。 H标签起侧重、强调的作用…...
使用Protocol Buffers优化数据传输
使用Protocol Buffers优化数据传输 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 什么是Protocol Buffers? Protocol Buffers(简称P…...
如何把mkv转成mp4?介绍一下将mkv转成MP4的几种方法
如何把mkv转成mp4?如果你有一个MKV格式的视频文件,但是需要将其转换为MP4格式以便更广泛地在各种设备和平台上播放和共享,你可以通过进行简单的文件格式转换来实现。转换MKV到MP4格式可以提供更好的兼容性,并确保你的视频文件能够…...
PHP语言学习02
好久不见,学如逆水行舟,不进则退,真是这样。。。突然感觉自己有点废。。。 <?php phpinfo(); ?> 新生第一个代码。 要想看到运行结果,打开浏览器(127.0.0.1/start/demo01.php) 其中,…...
PX2资料及问题记录
PX2的一些资料 官方论坛:https://devtalk.nvidia.com/default/board/182/drive-px2/ 官方网站:https://www.nvidia.com/en-us/self-driving-cars/ap2x/ 开发网站:https://developer.nvidia.com/drive/downloads docker docker run --devic…...
Jenkins容器的部署
本文主要是记录如何在Centos7上安装docker,以及在docker里面配置tomcat、mysql、jenkins等环境。 一、安装docker 1.1 准备工作 centos7、VMware17Pro 1.2 通过yum在线安装dokcer yum -y install docker1.3 启动docker服务 systemctl start docker.service1.4 查看docke…...
QT 自绘树形控件
资源来自:https://gitee.com/qt-open-source-collection/NavListView/blob/master/navlistview.h 1、解决的问题:一处编译报错;空白区域绘制背景;点击页面崩溃 2、源码: #ifndef NAVLISTVIEW_H #define NAVLISTVIEW_H/*** 作者:feiyangqingyun(QQ:517216493) 2016-10-1…...
axios之CancelToken取消请求
从 v0.22.0 开始,Axios 支持以 fetch API 方式—— AbortController 取消请求 此 API 从 v0.22.0 开始已被弃用,不应在新项目中使用 官网链接 1. 背景 最近项目中遇到一个场景,当连续触发一个请求时,如果是同一个接口…...
Unity | API鉴权用到的函数汇总
目录 一、HMAC-SHA1 二、UriEncode 三、Date 四、Content-MD5 五、参数操作 六、阿里云API鉴权 一、HMAC-SHA1 使用 RFC 2104 中定义的 HMAC-SHA1 方法生成带有密钥的哈希值: private static string CalculateSignature(string secret, string data){byte[] k…...
【python】socket通信代码解析
目录 一、socket通信原理 1.1 服务器端 1.2 客户端 二、socket通信主要应用场景 2.1 简单的服务器和客户端通信 2.2 并发服务器 2.3 UDP通信 2.4 文件传输 2.5 HTTP服务器 2.6 邮件发送与接收 2.7 FTP客户端 2.8 P2P文件共享 2.9 网络游戏 三、python中Socket编…...
FastGPT 手动部署错误:MongooseServerSelectionError: getaddrinfo EAI_AGAIN mongo
在运行 FastGPT 时,mongodb 报如下错误: MongooseServerSelectionError: getaddrinfo EAI_AGAIN mongo 这是因为 mongo 没有解析出来,在 hosts 文件中添加如下信息: 127.0.0.1 mongo 重新运行 FastGPT 即可。 参考链接ÿ…...
用英文介绍芝加哥(1):Making Modern Chicago Part 1 Building a Boomtown
Making Modern Chicago | Part 1: Building a Boomtown Link: https://www.youtube.com/watch?vpNdX0Dm-J8Y&listPLmSQiOQJmbZ7TU39cyx7gizM9i8nOuZXy&index4 Summary Summary of Chicago’s History and Development Urban Planning and Growth Chicago, often r…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
