FlinkCDC数据实时同步Mysql到ES
考大家一个问题,如果想要把数据库的数据同步到别的地方,比如es,mongodb,大家会采用哪些方案呢? :::

-
定时扫描同步?
-
实时日志同步?
定时同步是一个很好的方案,比较简单,但是如果对实时要求比较高的话,定时同步就有点不合适了。今天给大家介绍一种实时同步方案,就是是使用flinkcdc 来读取数据库日志,并且写入到elasticsearch中。
1.什么是flinkcdc?
Flink CDC(Change Data Capture)是指通过 Apache Flink 实现的一种数据变化捕获技术。CDC 可以实时捕获数据库中的数据变化,如插入、更新、删除操作,并将这些变化数据流式地传输到其他系统或存储中。通过 Flink CDC,用户可以实时监控数据库中的数据变化,并将这些变化数据用于实时分析、ETL(Extract, Transform, Load)等应用场景。Flink CDC 通常用于构建实时数据管道,帮助用户实现实时数据同步和分析。


2.flinkcdc发展趋势?

目前在github 上大概有5k 的star,也有越来越多的人使用。

3.flinkcdc有什么优势?
说到实时同步,canal 是比较常用的方案
canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 这句介绍有几个关键字:增量日志,增量数据订阅和消费。

canal的把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
那么 flinkcdc 和canal 对比,有什么不同呢?

这是网上的一个对比。可以看到 flinkcdc 和canal 一样,也是通过读取数据库日志的方式做到实时同步的,这个和很多实时同步的工具原理相同,比如 ogg debezium 都是这样做的,flinkcdc 的优势是基于flink 这个强大的实时计算引擎,可以做到集群部署,高可用等等,并且社区活跃,支持的平台多,像 mysql oracle mongodb 主流数据库都是支持的。而canal只支持mysql。
还有一个优势,flinkcdc 是基于java实现的,背靠大数据这个大平台,解决方案也是比较多的。源码阅读修改起来也是比较方便的。
4.一个例子
光说不练假把式,简单的写一个把mysql 数据实时同步到es的例子,使用flinksql的方式,只需要简单的几行sql

依赖
flink-1.15.0
flink-sql-connector-elasticsearch7-1.15.0.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
mysql 5.7
es 7.9.3
安装好flink 之后,把 flink-sql-connector-elasticsearch7-1.15.0.jar flink-sql-connector-mysql-cdc-2.2.1.jar 上传到 flink lib 目录 启动flink
./start-cluster.sh
打开flink sql 窗口
./start-cluster.sh
创建和mysql 关联的表
CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'products');
CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders');
创建和es 同步的表
CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://192.168.91.134:9200','index' = 'enriched_orders');
创建读取mysql写入es任务
INSERT INTO enriched_ordersSELECT o.*, p.name, p.descriptionFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.id;
执行这个任务后,mysql 的数据就能实时同步至es了
当然数据源也是支持很多种,比如 oracle mongodb sqlserver 写入端也支持 es kafka hive 等等,看大家需要。想我们的业务场景,是先将mysql 数据同步到kafka,然后再消费kafka 消息,把数据写入到es, hive,starrocks 等等。并且使用了checkpoint 做为断点恢复的保障。
5.最后
附上一些涉及的到网址,方便大家查阅
flinkcdc 官网
flinkcdc github
flink 官网
flink 文档
相关文章:
FlinkCDC数据实时同步Mysql到ES
考大家一个问题,如果想要把数据库的数据同步到别的地方,比如es,mongodb,大家会采用哪些方案呢? ::: 定时扫描同步? 实时日志同步? 定时同步是一个很好的方案,比较简单,但是如果对实时要求比较高的话,定…...
【Feign】 基于 Feign 远程调用、 自定义配置、性能优化、实现 Feign 最佳实践
🐌个人主页: 🐌 叶落闲庭 💨我的专栏:💨 SpringCloud MybatisPlus JVM 石可破也,而不可夺坚;丹可磨也,而不可夺赤。 Feign 一、 基于 Feign 远程调用1.1 RestTemplate方式…...
小迪安全笔记(3)——基础入门3、基础入门4
文章目录 一、抓包&封包&协议&APP&小程序&PC应用&web应用二、30余种加密编码进制&web&数据库&系统&代理 一、抓包&封包&协议&APP&小程序&PC应用&web应用 APP&小程序&PC抓包HTTP/S数据——Charles、F…...
SOME/IP 协议介绍(六)接口设计的兼容性规则
接口设计的兼容性规则(信息性) 对于所有序列化格式而言,向较新的服务接口的迁移有一定的限制。使用一组兼容性规则,SOME / IP允许服务接口的演进。可以以非破坏性的方式进行以下添加和增强: • 向服务中添加新方法 …...
吴恩达《机器学习》8-5->8-6:特征与直观理解I、样本与值观理解II
8.5、特征与直观理解I 一、神经网络的学习特性 神经网络通过学习可以得出自身的一系列特征。相对于普通的逻辑回归,在使用原始特征 x1,x2,...,xn 时受到一定的限制。虽然可以使用一些二项式项来组合这些特征,但仍然受到原始特征的限制。在神经网…...
『亚马逊云科技产品测评』活动征文|借助AWS EC2搭建服务器群组运维系统Zabbix+spug
授权声明:本篇文章授权活动官方亚马逊云科技文章转发、改写权,包括不限于在 Developer Centre, 知乎,自媒体平台,第三方开发者媒体等亚马逊云科技官方渠道。 本文基于以下软硬件工具: aws ec2 frp-0.52.3 zabbix 6…...
文件转换,简简单单,pdf转word,不要去找收费的了,自己学了之后免费转,之后就复制粘贴就ok了
先上一个链接pdf转word文件转换 接口层 PostMapping("pdfToWord")public String pdfToWord(RequestParam("file") MultipartFile file) throws IOException {String fileName FileExchangeUtil.pdfToWord(file.getInputStream(),file.getName());return…...
Jmeter——循环控制器中实现Counter计数器的次数重置
近期在使用Jmeter编写个辅助测试的脚本,用到了多个Loop Controller和Counter。 当时想的思路就是三个可变的数量值,使用循环实现;但第三个可变值的数量次数,是基于第二次循环中得到的结果才能确认最终次数,每次的结果…...
[创业之路-85]:IT创业成功老板的品质、创业失败老板的特征、成功领导者的品质、失败管理者的特征
目录 前言: 一、创业成功老板的品质 二、创业失败老板的特征 三、成功领导者的品质 四、失败管理者的特征 前言: 大多数创业或职场共事,都是基于某种人际关系或所谓的感情,这是大数的职场众生相,也是人情社会的中…...
警惕.360勒索病毒,您需要知道的预防和恢复方法。
引言: 网络威胁的演变无常,.360勒索病毒作为一种新兴的勒索软件,以其狡猾性备受关注。本文将深入介绍.360勒索病毒的特点,提供解决方案以恢复被其加密的数据,并分享一系列强化网络安全的预防措施。如果您在面对被勒索…...
人力资源小程序
人力资源管理对于企业的运营至关重要,而如今随着科技的发展,制作一个人力资源小程序已经变得非常简单和便捷。在本文中,我们将为您介绍如何通过乔拓云网制作一个人力资源小程序,只需五个简单的步骤。 第一步:注册登录乔…...
【多线程 - 10、线程同步3 ThreadLocal】
一、ThreadLocal 1、介绍 可以实现资源对象的线程隔离;可以实现了线程内的资源共享 如果使用 ThreadLocal 管理变量,则每一个使用该变量的线程都获得该变量的副本, 副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本…...
【Flink 问题集】The generic type parameters of ‘Collector‘ are missing
错误展示: Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function main(CollectionDemo.java:33) could not be determined automatically, due to type erasure. You can give type in…...
数据分析—将txt文件转为csv文件;将csv文件转为xls文件
txt文件转为csv文件转化代码: import csv# 输入txt文件路径 txt_file rC:\Users\ZARD\Desktop\daily-min-temperatures.txt# 输出csv文件路径 csv_file rC:\Users\ZARD\Desktop\daily-min-temperatures.csv# 打开txt文件以读取数据 with open(txt_file, r) as tx…...
【算法】二分查找-20231120
这里写目录标题 一、75. 颜色分类二、80. 删除有序数组中的重复项 II三、125. 验证回文串四、189. 轮转数组 一、75. 颜色分类 提示 中等 给定一个包含红色、白色和蓝色、共 n 个元素的数组 nums ,原地对它们进行排序,使得相同颜色的元素相邻ÿ…...
WPF实现将鼠标悬浮在按钮上时弹出菜单
在WPF 中,要实现当鼠标悬停在按钮上时显示菜单,并能够灵活设置菜单的位置(如按钮的上方或下方),你可以使用 Popup 控件来创建自定义的弹出菜单。以下是如何通过 Popup 控件来实现这种功能的步骤: 1. 在 XA…...
车载以太网-传输层-UDP
文章目录 UDP协议UDP报文格式UDP报文示例UDP协议测试UDP协议 UDP(User Datagram Protocol)是一种无连接的传输层协议,它不保证数据传输的可靠性,但是具有传输速度快的优点。UDP协议主要用于那些对数据传输速度要求较高,但对数据传输的可靠性要求不高的应用场景,如音视频…...
uniapp如何上传文件,使用API是什么
在uniapp中上传文件的方法有很多,其中一种常用的方法是使用wx.uploadFile() API。该API可以上传本地文件或网络文件,并支持设置请求头、请求参数等选项。 具体使用方法如下: 1.引入API: import { uploadFile } from /util/requ…...
【狂神说Java】Docker概述 | Docker安装 | Docker的常用命令
✅作者简介:CSDN内容合伙人、信息安全专业在校大学生🏆 🔥系列专栏 :【狂神说Java】 📃新人博主 :欢迎点赞收藏关注,会回访! 💬舞台再大,你不上台,…...
Git精讲
Git基本操作 创建Git本地仓库 git initgit clone 配置Git git config [--global] user.name "Your Name" git config [--global] user.email "emailexample.com"–global是一个可选项。如果使用了该选项,表示这台机器上所有的Git仓库都会使…...
设计岗位替代风险评估程序,分析岗位可替代性,给出创新能力补强提升方向。
一、实际应用场景描述在数字化转型加速背景下,企业和个人普遍关心以下问题:- HR 在做岗位规划时需要评估 自动化风险- 员工希望了解自己的岗位是否容易被 AI / 脚本替代- 创业者需要判断某类服务是否值得人力长期投入- 学生在做职业规划时需要参考岗位演…...
信道解码算法对比:OSD为何在短中长码中优于神经网络与Transformer解码器
1. 项目概述在通信系统的信道编码领域,前向纠错(FEC)技术是保障数据传输可靠性的核心。其基本原理是通过在发送端添加冗余信息,使接收端能够在存在噪声的信道中检测并纠正错误。随着机器学习技术的发展,基于神经网络的…...
DeepSeek-R1量化部署实战指南(含TensorRT+AWQ+GGUF三引擎对比评测)
更多请点击: https://intelliparadigm.com 第一章:DeepSeek-R1量化部署方案概览 DeepSeek-R1 是一款高性能开源大语言模型,其量化部署旨在平衡推理精度、显存占用与吞吐效率。本章聚焦于面向生产环境的轻量化落地路径,涵盖权重量…...
别再重启了!Win11开机卡死/间歇性卡顿的终极排查与修复指南(附免费工具)
Win11系统卡顿终极自救指南:从根源排查到永久修复 每次开机都像在玩俄罗斯轮盘赌——今天Win11会卡死吗?鼠标指针变成沙漏图标的那一刻,血压瞬间飙升的场景想必每个Win11用户都经历过。不同于网上那些"重启试试"的敷衍建议…...
为 OpenClaw 配置 Taotoken 以实现稳定可靠的 Agent 工作流
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 为 OpenClaw 配置 Taotoken 以实现稳定可靠的 Agent 工作流 对于使用 OpenClaw 框架构建智能体(Agent)的开…...
工业无网智能诊断:可执行二维码与QRind语言深度解析
1. 项目概述:当二维码“活”起来,工业现场的无网智能诊断在工业现场,尤其是那些网络信号不稳定甚至完全隔绝的区域——比如大型石化厂的深处、地下矿井的作业面,或是某些对网络安全有严格管控的精密制造车间,我们常常面…...
如何用Xournal++实现跨平台手写笔记:免费开源PDF批注工具完全指南 [特殊字符]
如何用Xournal实现跨平台手写笔记:免费开源PDF批注工具完全指南 🚀 【免费下载链接】xournalpp Xournal is a handwriting notetaking software with PDF annotation support. Written in C with GTK3, supporting Linux (e.g. Ubuntu, Debian, Arch, SU…...
VisualGGPK2:流放之路游戏资源编辑的终极指南,轻松打造个性化游戏体验
VisualGGPK2:流放之路游戏资源编辑的终极指南,轻松打造个性化游戏体验 【免费下载链接】VisualGGPK2 Library for Content.ggpk of PathOfExile (Rewrite of libggpk) 项目地址: https://gitcode.com/gh_mirrors/vi/VisualGGPK2 VisualGGPK2是一款…...
小型语言模型在乳业智能决策中的技术突破与应用
1. 小型语言模型在乳业智能决策中的技术突破在乳制品行业数字化转型浪潮中,我们面临着一个核心矛盾:大型语言模型(LLM)虽然能力强大,但高昂的云计算成本和数据隐私风险让大多数牧场望而却步。而小型语言模型࿰…...
如何用DeepL Chrome翻译插件打破语言障碍:从安装到精通的完整指南
如何用DeepL Chrome翻译插件打破语言障碍:从安装到精通的完整指南 【免费下载链接】deepl-chrome-extension A DeepL Translator Chrome extension 项目地址: https://gitcode.com/gh_mirrors/de/deepl-chrome-extension 你是否经常遇到需要阅读外文网页却苦…...
