SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
- 一、前言
- 二、技术介绍(Flink CDC)
- 1、Flink CDC
- 2、Postgres CDC
- 三、准备工作
- 四、代码示例
- 五、总结
一、前言
在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:
【技术实现】java实时同步postgresql变更数据,基于WAL日志
但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;
二、技术介绍(Flink CDC)
1、Flink CDC
Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。
2、Postgres CDC
1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;
三、准备工作
1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;

2、修改postgresql数据库配置,通过wal日志监听变更数据
修改postgresql.conf文件,重启服务
wal_level=logical
3、springboot关键maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>
注:其它依赖不在列举,可以通过获取源码查看
四、代码示例
InitAction02.java
package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}
CustomSink.java
package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}
执行结果:
1)新增数据

2)变更数据输出
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}
五、总结
Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;
Postgres CDC 连接器
👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻
相关文章:
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志 一、前言二、技术介绍(Flink CDC)1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库(postgresql、m…...
ThinkPHP事件的使用
技术说明 1.ThinkPHP版本:支持6.0、8.0 2.使用场景:用户登陆后日志记录、通知消息发送等主流程、次流程分离等场景 3.说明:网上很多帖子说的不明不白的,建议大家自己手动尝试总结一下 4.事件手动绑定的时候,一定要…...
【Nuxt】服务端渲染 SSR
SSR 概述 服务器端渲染全称是:Server Side Render,在服务器端渲染页面,并将渲染好HTML返回给浏览器呈现。 SSR应用的页面是在服务端渲染的,用户每请求一个SSR页面都会先在服务端进行渲染,然后将渲染好的页面…...
Spring Boot整合WebSocket
说明:本文介绍如何在Spirng Boot中整合WebSocket,WebSocket介绍,参考下面这篇文章: WebSocket 原始方式 原始方式,指的是使用Spring Boot自己整合的方式,导入的是下面这个依赖 <dependency><g…...
《LeetCode热题100》---<5.③普通数组篇五道>
本篇博客讲解LeetCode热题100道普通数组篇中的五道题 第五道:缺失的第一个正数(困难) 第五道:缺失的第一个正数(困难) 方法一:将数组视为哈希表 class Solution {public int firstMissingPosi…...
Cocos Creator文档学习记录
Cocos Creator文档学习记录 一、什么是Cocos Creator 官方文档链接:Hello World | Cocos Creator 百度百科:Cocos Creator_百度百科 Cocos Creator包括开发和调试、商业化 SDK 的集成、多平台发布、测试、上线这一整套工作流程,可多次的迭…...
插入数据优化 ---大批量数据插入建议使用load
一.insert优化 1.批量插入 2.手动提交事务 3.主键顺序插入 二.大批量插入数据 如果一次性需要插入大批量数据,使用insert语句插入性能较低,此时可以使用MySQL数据库提供的load指令进行插入。操作如下 1.客户端连接服务端时,加入参数 --local-infine mysql --local-infine…...
【Linux】一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向?
欢迎来到 CILMY23 的博客 🏆本篇主题为:一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向? 🏆个人主页:CILMY23-CSDN博客 🏆系列专栏:Py…...
svn软件总成全内容
SVN软件总成 概述:本文为经验型文档 目录 D:\安装包\svn软件总成 的目录D:\安装包\svn软件总成\svn-base添加 的目录D:\安装包\svn软件总成\tools 的目录D:\安装包\svn软件总成\tools\sqlite-tools-win32-x86-3360000 的目录D:\安装包\svn软件总成\安装包-----bt lo…...
[激光原理与应用-118]:电源系统的接地详解:小信号的噪声干扰优化,从良好外壳接地开始
目录 一、电路的基本原理:电流回路 1、电流回路的基本概念 2、电流回路的特性 3、电流回路的类型 4、电流回路的应用 五、电流回路的注意事项 二、交流设备的接地 1.1 概述 1、交流工作接地的定义 2、交流工作接地的作用 3、交流工作接地的规范要求 4、…...
回测本身就是一种过度拟合?
这也许是一个絮絮叨叨的专题,跟大伙儿唠一唠量化相关的小问题,有感而发写到哪算哪,这是第一期,先唠个10块钱的~ 前段时间在某乎上看到这样一个问题『您怎么理解回测本身就是一种过度拟合?』 个人看来,回测本…...
什么是Arduino?
Arduino是一款便捷灵活、方便上手的开源电子原型平台,由欧洲的一个开发团队于2005年冬季开发。以下是关于Arduino的详细介绍: 一、基本概述 定义:Arduino是一个基于开放源代码的软硬件平台,它让电子设计更加简单快捷。通过Arduin…...
【机器学习基础】Scikit-learn主要用法
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科,通过算法和模型让计算机从数据中学习,进行模型训练和优化,做出预测、分类和决策支持。Python成为机器学习的首选语言,…...
python-素数回文数的个数(赛氪OJ)
[题目描述] 求 11 到 n 之间(包括 n),既是素数又是回文数的整数有多少个。输入: 一个大于 11 小于 10000 的整数 n。输出: 11 到 n 之间的素数回文数个数。样例输入1 23 样例输出1 1 提示: 回文数指左右对…...
OCC 网格化(二)-网格划分算法
目录 一、概述 二、详解 1. 线性偏转 (Linear Deflection) 2. 角偏转 (Angular Deflection) 三、示例 3.1 示例1 3.2 示例2 一、概述 在 Open CASCADE Technology (OCC) 中默认的网格划分算法BRepMesh_IncrementalMesh有两个主要的选项来定义三角剖分—线性和角偏转。 …...
pyecharts模块
PyEcharts 一个基于ECharts库的Python封装库,它使得开发者可以方便地在Python环境中创建交互式的图表,包括折线图、柱状图、饼图、地图等多种可视化效果。 优点: 易用性:PyEcharts提供了简单易懂的API,通过链式调用…...
深⼊理解指针(3)
1. 字符指针变量 2. 数组指针变量 3. ⼆维数组传参的本质 4. 函数指针变量 5. 函数指针数组 6. 转移表 1. 字符指针变量 在指针的类型中我们知道有⼀种指针类型为字符指针 ⼀般使⽤: char* 这两种方式都是把字符串中的首字符的地址赋值给pc。 在这串代码中 str1内容的地…...
黑马头条vue2.0项目实战(四)——首页—文章列表
目录 1. 头部导航栏 1.1 页面布局 1.2 样式调整中遇到的问题 2. 频道列表 2.1 页面布局 2.2 样式调整 2.3 展示频道列表 3. 文章列表 3.1 思路分析 3.2 使用 List 列表组件 3.3 加载文章列表数据 3.4 下拉刷新 3.5 设置上下padding固定头部和频道列表 3.6 记住列…...
UE5.4内容示例(4)UI_UMG - 学习笔记
https://www.unrealengine.com/marketplace/zh-CN/product/content-examples 《内容示例》是学习UE5的基础示例,可以用此熟悉一遍UE5的功能 UI示例 UI_UMG :基本UMGUI_CommonUI :UMG多层应用UI_SlatePostBuffer UI :FX的示例&…...
C#实现数据采集系统-配置文件化
系统优化-配置 配置信息ip端口,还有点位信息,什么的都是直接在代码里直接写死,添加点位,修改配置,比较麻烦,每次修改都需要重新生成打包。 所以将这些配置都改成配置文件,这样只需要修改配置文件,程序无须修改,即可更新。 配置代码: 如果我们有100个采集,一个个去…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
AGain DB和倍数增益的关系
我在设置一款索尼CMOS芯片时,Again增益0db变化为6DB,画面的变化只有2倍DN的增益,比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析: 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...
【若依】框架项目部署笔记
参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作: 压缩包下载:http://download.redis.io/releases 1. 上传压缩包,并进入压缩包所在目录,解压到目标…...
