当前位置: 首页 > news >正文

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}

CustomSink.java

package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻

相关文章:

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据&#xff0c;基于WAL日志 一、前言二、技术介绍&#xff08;Flink CDC&#xff09;1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库&#xff08;postgresql、m…...

ThinkPHP事件的使用

技术说明 1.ThinkPHP版本&#xff1a;支持6.0、8.0 2.使用场景&#xff1a;用户登陆后日志记录、通知消息发送等主流程、次流程分离等场景 3.说明&#xff1a;网上很多帖子说的不明不白的&#xff0c;建议大家自己手动尝试总结一下 4.事件手动绑定的时候&#xff0c;一定要…...

【Nuxt】服务端渲染 SSR

SSR 概述 服务器端渲染全称是&#xff1a;Server Side Render&#xff0c;在服务器端渲染页面&#xff0c;并将渲染好HTML返回给浏览器呈现。 SSR应用的页面是在服务端渲染的&#xff0c;用户每请求一个SSR页面都会先在服务端进行渲染&#xff0c;然后将渲染好的页面&#xf…...

Spring Boot整合WebSocket

说明&#xff1a;本文介绍如何在Spirng Boot中整合WebSocket&#xff0c;WebSocket介绍&#xff0c;参考下面这篇文章&#xff1a; WebSocket 原始方式 原始方式&#xff0c;指的是使用Spring Boot自己整合的方式&#xff0c;导入的是下面这个依赖 <dependency><g…...

《LeetCode热题100》---<5.③普通数组篇五道>

本篇博客讲解LeetCode热题100道普通数组篇中的五道题 第五道&#xff1a;缺失的第一个正数&#xff08;困难&#xff09; 第五道&#xff1a;缺失的第一个正数&#xff08;困难&#xff09; 方法一&#xff1a;将数组视为哈希表 class Solution {public int firstMissingPosi…...

Cocos Creator文档学习记录

Cocos Creator文档学习记录 一、什么是Cocos Creator 官方文档链接&#xff1a;Hello World | Cocos Creator 百度百科&#xff1a;Cocos Creator_百度百科 Cocos Creator包括开发和调试、商业化 SDK 的集成、多平台发布、测试、上线这一整套工作流程&#xff0c;可多次的迭…...

插入数据优化 ---大批量数据插入建议使用load

一.insert优化 1.批量插入 2.手动提交事务 3.主键顺序插入 二.大批量插入数据 如果一次性需要插入大批量数据,使用insert语句插入性能较低,此时可以使用MySQL数据库提供的load指令进行插入。操作如下 1.客户端连接服务端时,加入参数 --local-infine mysql --local-infine…...

【Linux】一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向?

欢迎来到 CILMY23 的博客 &#x1f3c6;本篇主题为&#xff1a;一篇总结&#xff01;什么是重定向&#xff1f;输出重定向的作用是什么&#xff1f;什么又是追加重定向&#xff1f; &#x1f3c6;个人主页&#xff1a;CILMY23-CSDN博客 &#x1f3c6;系列专栏&#xff1a;Py…...

svn软件总成全内容

SVN软件总成 概述&#xff1a;本文为经验型文档 目录 D:\安装包\svn软件总成 的目录D:\安装包\svn软件总成\svn-base添加 的目录D:\安装包\svn软件总成\tools 的目录D:\安装包\svn软件总成\tools\sqlite-tools-win32-x86-3360000 的目录D:\安装包\svn软件总成\安装包-----bt lo…...

[激光原理与应用-118]:电源系统的接地详解:小信号的噪声干扰优化,从良好外壳接地开始

目录 一、电路的基本原理&#xff1a;电流回路 1、电流回路的基本概念 2、电流回路的特性 3、电流回路的类型 4、电流回路的应用 五、电流回路的注意事项 二、交流设备的接地 1.1 概述 1、交流工作接地的定义 2、交流工作接地的作用 3、交流工作接地的规范要求 4、…...

回测本身就是一种过度拟合?

这也许是一个絮絮叨叨的专题&#xff0c;跟大伙儿唠一唠量化相关的小问题&#xff0c;有感而发写到哪算哪&#xff0c;这是第一期&#xff0c;先唠个10块钱的~ 前段时间在某乎上看到这样一个问题『您怎么理解回测本身就是一种过度拟合&#xff1f;』 个人看来&#xff0c;回测本…...

什么是Arduino?

Arduino是一款便捷灵活、方便上手的开源电子原型平台&#xff0c;由欧洲的一个开发团队于2005年冬季开发。以下是关于Arduino的详细介绍&#xff1a; 一、基本概述 定义&#xff1a;Arduino是一个基于开放源代码的软硬件平台&#xff0c;它让电子设计更加简单快捷。通过Arduin…...

【机器学习基础】Scikit-learn主要用法

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…...

python-素数回文数的个数(赛氪OJ)

[题目描述] 求 11 到 n 之间&#xff08;包括 n&#xff09;&#xff0c;既是素数又是回文数的整数有多少个。输入&#xff1a; 一个大于 11 小于 10000 的整数 n。输出&#xff1a; 11 到 n 之间的素数回文数个数。样例输入1 23 样例输出1 1 提示&#xff1a; 回文数指左右对…...

OCC 网格化(二)-网格划分算法

目录 一、概述 二、详解 1. 线性偏转 (Linear Deflection) 2. 角偏转 (Angular Deflection) 三、示例 3.1 示例1 3.2 示例2 一、概述 在 Open CASCADE Technology (OCC) 中默认的网格划分算法BRepMesh_IncrementalMesh有两个主要的选项来定义三角剖分—线性和角偏转。 …...

pyecharts模块

PyEcharts 一个基于ECharts库的Python封装库&#xff0c;它使得开发者可以方便地在Python环境中创建交互式的图表&#xff0c;包括折线图、柱状图、饼图、地图等多种可视化效果。 优点&#xff1a; 易用性&#xff1a;PyEcharts提供了简单易懂的API&#xff0c;通过链式调用…...

深⼊理解指针(3)

1. 字符指针变量 2. 数组指针变量 3. ⼆维数组传参的本质 4. 函数指针变量 5. 函数指针数组 6. 转移表 1. 字符指针变量 在指针的类型中我们知道有⼀种指针类型为字符指针 ⼀般使⽤: char* 这两种方式都是把字符串中的首字符的地址赋值给pc。 在这串代码中 str1内容的地…...

黑马头条vue2.0项目实战(四)——首页—文章列表

目录 1. 头部导航栏 1.1 页面布局 1.2 样式调整中遇到的问题 2. 频道列表 2.1 页面布局 2.2 样式调整 2.3 展示频道列表 3. 文章列表 3.1 思路分析 3.2 使用 List 列表组件 3.3 加载文章列表数据 3.4 下拉刷新 3.5 设置上下padding固定头部和频道列表 3.6 记住列…...

UE5.4内容示例(4)UI_UMG - 学习笔记

https://www.unrealengine.com/marketplace/zh-CN/product/content-examples 《内容示例》是学习UE5的基础示例&#xff0c;可以用此熟悉一遍UE5的功能 UI示例 UI_UMG &#xff1a;基本UMGUI_CommonUI &#xff1a;UMG多层应用UI_SlatePostBuffer UI &#xff1a;FX的示例&…...

C#实现数据采集系统-配置文件化

系统优化-配置 配置信息ip端口,还有点位信息,什么的都是直接在代码里直接写死,添加点位,修改配置,比较麻烦,每次修改都需要重新生成打包。 所以将这些配置都改成配置文件,这样只需要修改配置文件,程序无须修改,即可更新。 配置代码: 如果我们有100个采集,一个个去…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...