SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
- 一、前言
- 二、技术介绍(Flink CDC)
- 1、Flink CDC
- 2、Postgres CDC
- 三、准备工作
- 四、代码示例
- 五、总结
一、前言
在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:
【技术实现】java实时同步postgresql变更数据,基于WAL日志
但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;
二、技术介绍(Flink CDC)
1、Flink CDC
Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。
2、Postgres CDC
1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;
三、准备工作
1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;

2、修改postgresql数据库配置,通过wal日志监听变更数据
修改postgresql.conf文件,重启服务
wal_level=logical
3、springboot关键maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>
注:其它依赖不在列举,可以通过获取源码查看
四、代码示例
InitAction02.java
package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}
CustomSink.java
package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}
执行结果:
1)新增数据

2)变更数据输出
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}
五、总结
Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;
Postgres CDC 连接器
👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻
相关文章:
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志
SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志 一、前言二、技术介绍(Flink CDC)1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库(postgresql、m…...
ThinkPHP事件的使用
技术说明 1.ThinkPHP版本:支持6.0、8.0 2.使用场景:用户登陆后日志记录、通知消息发送等主流程、次流程分离等场景 3.说明:网上很多帖子说的不明不白的,建议大家自己手动尝试总结一下 4.事件手动绑定的时候,一定要…...
【Nuxt】服务端渲染 SSR
SSR 概述 服务器端渲染全称是:Server Side Render,在服务器端渲染页面,并将渲染好HTML返回给浏览器呈现。 SSR应用的页面是在服务端渲染的,用户每请求一个SSR页面都会先在服务端进行渲染,然后将渲染好的页面…...
Spring Boot整合WebSocket
说明:本文介绍如何在Spirng Boot中整合WebSocket,WebSocket介绍,参考下面这篇文章: WebSocket 原始方式 原始方式,指的是使用Spring Boot自己整合的方式,导入的是下面这个依赖 <dependency><g…...
《LeetCode热题100》---<5.③普通数组篇五道>
本篇博客讲解LeetCode热题100道普通数组篇中的五道题 第五道:缺失的第一个正数(困难) 第五道:缺失的第一个正数(困难) 方法一:将数组视为哈希表 class Solution {public int firstMissingPosi…...
Cocos Creator文档学习记录
Cocos Creator文档学习记录 一、什么是Cocos Creator 官方文档链接:Hello World | Cocos Creator 百度百科:Cocos Creator_百度百科 Cocos Creator包括开发和调试、商业化 SDK 的集成、多平台发布、测试、上线这一整套工作流程,可多次的迭…...
插入数据优化 ---大批量数据插入建议使用load
一.insert优化 1.批量插入 2.手动提交事务 3.主键顺序插入 二.大批量插入数据 如果一次性需要插入大批量数据,使用insert语句插入性能较低,此时可以使用MySQL数据库提供的load指令进行插入。操作如下 1.客户端连接服务端时,加入参数 --local-infine mysql --local-infine…...
【Linux】一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向?
欢迎来到 CILMY23 的博客 🏆本篇主题为:一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向? 🏆个人主页:CILMY23-CSDN博客 🏆系列专栏:Py…...
svn软件总成全内容
SVN软件总成 概述:本文为经验型文档 目录 D:\安装包\svn软件总成 的目录D:\安装包\svn软件总成\svn-base添加 的目录D:\安装包\svn软件总成\tools 的目录D:\安装包\svn软件总成\tools\sqlite-tools-win32-x86-3360000 的目录D:\安装包\svn软件总成\安装包-----bt lo…...
[激光原理与应用-118]:电源系统的接地详解:小信号的噪声干扰优化,从良好外壳接地开始
目录 一、电路的基本原理:电流回路 1、电流回路的基本概念 2、电流回路的特性 3、电流回路的类型 4、电流回路的应用 五、电流回路的注意事项 二、交流设备的接地 1.1 概述 1、交流工作接地的定义 2、交流工作接地的作用 3、交流工作接地的规范要求 4、…...
回测本身就是一种过度拟合?
这也许是一个絮絮叨叨的专题,跟大伙儿唠一唠量化相关的小问题,有感而发写到哪算哪,这是第一期,先唠个10块钱的~ 前段时间在某乎上看到这样一个问题『您怎么理解回测本身就是一种过度拟合?』 个人看来,回测本…...
什么是Arduino?
Arduino是一款便捷灵活、方便上手的开源电子原型平台,由欧洲的一个开发团队于2005年冬季开发。以下是关于Arduino的详细介绍: 一、基本概述 定义:Arduino是一个基于开放源代码的软硬件平台,它让电子设计更加简单快捷。通过Arduin…...
【机器学习基础】Scikit-learn主要用法
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科,通过算法和模型让计算机从数据中学习,进行模型训练和优化,做出预测、分类和决策支持。Python成为机器学习的首选语言,…...
python-素数回文数的个数(赛氪OJ)
[题目描述] 求 11 到 n 之间(包括 n),既是素数又是回文数的整数有多少个。输入: 一个大于 11 小于 10000 的整数 n。输出: 11 到 n 之间的素数回文数个数。样例输入1 23 样例输出1 1 提示: 回文数指左右对…...
OCC 网格化(二)-网格划分算法
目录 一、概述 二、详解 1. 线性偏转 (Linear Deflection) 2. 角偏转 (Angular Deflection) 三、示例 3.1 示例1 3.2 示例2 一、概述 在 Open CASCADE Technology (OCC) 中默认的网格划分算法BRepMesh_IncrementalMesh有两个主要的选项来定义三角剖分—线性和角偏转。 …...
pyecharts模块
PyEcharts 一个基于ECharts库的Python封装库,它使得开发者可以方便地在Python环境中创建交互式的图表,包括折线图、柱状图、饼图、地图等多种可视化效果。 优点: 易用性:PyEcharts提供了简单易懂的API,通过链式调用…...
深⼊理解指针(3)
1. 字符指针变量 2. 数组指针变量 3. ⼆维数组传参的本质 4. 函数指针变量 5. 函数指针数组 6. 转移表 1. 字符指针变量 在指针的类型中我们知道有⼀种指针类型为字符指针 ⼀般使⽤: char* 这两种方式都是把字符串中的首字符的地址赋值给pc。 在这串代码中 str1内容的地…...
黑马头条vue2.0项目实战(四)——首页—文章列表
目录 1. 头部导航栏 1.1 页面布局 1.2 样式调整中遇到的问题 2. 频道列表 2.1 页面布局 2.2 样式调整 2.3 展示频道列表 3. 文章列表 3.1 思路分析 3.2 使用 List 列表组件 3.3 加载文章列表数据 3.4 下拉刷新 3.5 设置上下padding固定头部和频道列表 3.6 记住列…...
UE5.4内容示例(4)UI_UMG - 学习笔记
https://www.unrealengine.com/marketplace/zh-CN/product/content-examples 《内容示例》是学习UE5的基础示例,可以用此熟悉一遍UE5的功能 UI示例 UI_UMG :基本UMGUI_CommonUI :UMG多层应用UI_SlatePostBuffer UI :FX的示例&…...
C#实现数据采集系统-配置文件化
系统优化-配置 配置信息ip端口,还有点位信息,什么的都是直接在代码里直接写死,添加点位,修改配置,比较麻烦,每次修改都需要重新生成打包。 所以将这些配置都改成配置文件,这样只需要修改配置文件,程序无须修改,即可更新。 配置代码: 如果我们有100个采集,一个个去…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
数据结构:递归的种类(Types of Recursion)
目录 尾递归(Tail Recursion) 什么是 Loop(循环)? 复杂度分析 头递归(Head Recursion) 树形递归(Tree Recursion) 线性递归(Linear Recursion)…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
用递归算法解锁「子集」问题 —— LeetCode 78题解析
文章目录 一、题目介绍二、递归思路详解:从决策树开始理解三、解法一:二叉决策树 DFS四、解法二:组合式回溯写法(推荐)五、解法对比 递归算法是编程中一种非常强大且常见的思想,它能够优雅地解决很多复杂的…...
MySQL体系架构解析(三):MySQL目录与启动配置全解析
MySQL中的目录和文件 bin目录 在 MySQL 的安装目录下有一个特别重要的 bin 目录,这个目录下存放着许多可执行文件。与其他系统的可执行文件类似,这些可执行文件都是与服务器和客户端程序相关的。 启动MySQL服务器程序 在 UNIX 系统中,用…...
