当前位置: 首页 > news >正文

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

    • 一、前言
    • 二、技术介绍(Flink CDC)
      • 1、Flink CDC
      • 2、Postgres CDC
    • 三、准备工作
    • 四、代码示例
    • 五、总结

一、前言

在工作中经常会遇到要实时获取数据库(postgresql、mysql等)的变更数据,主要体现数据的实时性;mysql数据库有canal工具实现很简单,但是基于postgresql数据库获取实时数据就比较复杂,之前已经写过一篇获取postgresql数据库实时数据的文章,如下:

【技术实现】java实时同步postgresql变更数据,基于WAL日志

但是,之前的实现方式比较繁琐,不利于维护,所有本文整合Flink CDC通过一个比较简单的方式实现;

二、技术介绍(Flink CDC)

1、Flink CDC

Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源数据变更捕获(CDC)框架。其核心功能是从各种关系型数据库(如MySQL、PostgreSQL、Oracle等)中捕获数据变更(如增删改操作),并将这些变更以流的形式提供给Flink等流处理引擎进行处理;
1)CDC(Change Data Capture):数据变更捕获的简称,用于监测并捕获数据库的变动,然后将这些变更按照发生顺序捕获,并写入到目标存储系统(如数据仓库、数据湖、消息队列等)。
2)Flink CDC:基于Flink的CDC实现,将CDC技术与Flink流处理引擎相结合,实现数据的实时捕获、处理和传输。

2、Postgres CDC

1)Postgres CDC(Change Data Capture)连接器是用于从PostgreSQL数据库捕获数据变更(如增删改操作)并将其以流的形式提供给数据处理引擎(如Flink)的组件;
2)PostgreSQL版本:Postgres CDC连接器通常支持PostgreSQL的多个版本,具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等;

三、准备工作

1、安装postgresql数据库,并创建库和测试使用的表,这里不再列举详细步骤;
在这里插入图片描述
2、修改postgresql数据库配置,通过wal日志监听变更数据

修改postgresql.conf文件,重启服务
wal_level=logical

3、springboot关键maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>3.0.1</version>
</dependency>

注:其它依赖不在列举,可以通过获取源码查看

四、代码示例

InitAction02.java

package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;@Configuration
public class InitAction02 {@PostConstructpublic void run() throws Exception {DebeziumDeserializationSchema<String> deserializer =new JsonDebeziumDeserializationSchema();JdbcIncrementalSource<String> postgresIncrementalSource =PostgresSourceBuilder.PostgresIncrementalSource.<String>builder().hostname("127.0.0.1").port(5432).database("postgres").schemaList("public").tableList("public.student").username("postgres").password("password").slotName("flink").decodingPluginName("pgoutput") // use pgoutput for PostgreSQL 10+.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),"PostgresParallelSource").setParallelism(2).addSink(new CustomSink());//.print();env.execute("Output Postgres Snapshot");}}

CustomSink.java

package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;@Log4j2
public class CustomSink extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {log.info("============数据发生变化:{}", value);}
}

执行结果:

1)新增数据
在这里插入图片描述

2)变更数据输出

2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO  [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO  [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN 'LSN{0/3588018}' arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO  [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: ============数据发生变化:{"before":null,"after":{"id":8,"name":"8","age":8,"remark":"8"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres_cdc_source","ts_ms":1722355215252,"snapshot":"false","db":"postgres","sequence":"[null,\"56131608\"]","schema":"public","table":"student","txId":932,"lsn":56131608,"xmin":null},"op":"c","ts_ms":1722355216336,"transaction":null}

五、总结

Postgres CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后继续读取二进制日志,即使发生故障,也会进行一次处理;

Postgres CDC 连接器

👇🏻 👇🏻 👇🏻注:文章源代码关注下面公众号获取👇🏻 👇🏻 👇🏻

相关文章:

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据&#xff0c;基于WAL日志 一、前言二、技术介绍&#xff08;Flink CDC&#xff09;1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库&#xff08;postgresql、m…...

ThinkPHP事件的使用

技术说明 1.ThinkPHP版本&#xff1a;支持6.0、8.0 2.使用场景&#xff1a;用户登陆后日志记录、通知消息发送等主流程、次流程分离等场景 3.说明&#xff1a;网上很多帖子说的不明不白的&#xff0c;建议大家自己手动尝试总结一下 4.事件手动绑定的时候&#xff0c;一定要…...

【Nuxt】服务端渲染 SSR

SSR 概述 服务器端渲染全称是&#xff1a;Server Side Render&#xff0c;在服务器端渲染页面&#xff0c;并将渲染好HTML返回给浏览器呈现。 SSR应用的页面是在服务端渲染的&#xff0c;用户每请求一个SSR页面都会先在服务端进行渲染&#xff0c;然后将渲染好的页面&#xf…...

Spring Boot整合WebSocket

说明&#xff1a;本文介绍如何在Spirng Boot中整合WebSocket&#xff0c;WebSocket介绍&#xff0c;参考下面这篇文章&#xff1a; WebSocket 原始方式 原始方式&#xff0c;指的是使用Spring Boot自己整合的方式&#xff0c;导入的是下面这个依赖 <dependency><g…...

《LeetCode热题100》---<5.③普通数组篇五道>

本篇博客讲解LeetCode热题100道普通数组篇中的五道题 第五道&#xff1a;缺失的第一个正数&#xff08;困难&#xff09; 第五道&#xff1a;缺失的第一个正数&#xff08;困难&#xff09; 方法一&#xff1a;将数组视为哈希表 class Solution {public int firstMissingPosi…...

Cocos Creator文档学习记录

Cocos Creator文档学习记录 一、什么是Cocos Creator 官方文档链接&#xff1a;Hello World | Cocos Creator 百度百科&#xff1a;Cocos Creator_百度百科 Cocos Creator包括开发和调试、商业化 SDK 的集成、多平台发布、测试、上线这一整套工作流程&#xff0c;可多次的迭…...

插入数据优化 ---大批量数据插入建议使用load

一.insert优化 1.批量插入 2.手动提交事务 3.主键顺序插入 二.大批量插入数据 如果一次性需要插入大批量数据,使用insert语句插入性能较低,此时可以使用MySQL数据库提供的load指令进行插入。操作如下 1.客户端连接服务端时,加入参数 --local-infine mysql --local-infine…...

【Linux】一篇总结!什么是重定向?输出重定向的作用是什么?什么又是追加重定向?

欢迎来到 CILMY23 的博客 &#x1f3c6;本篇主题为&#xff1a;一篇总结&#xff01;什么是重定向&#xff1f;输出重定向的作用是什么&#xff1f;什么又是追加重定向&#xff1f; &#x1f3c6;个人主页&#xff1a;CILMY23-CSDN博客 &#x1f3c6;系列专栏&#xff1a;Py…...

svn软件总成全内容

SVN软件总成 概述&#xff1a;本文为经验型文档 目录 D:\安装包\svn软件总成 的目录D:\安装包\svn软件总成\svn-base添加 的目录D:\安装包\svn软件总成\tools 的目录D:\安装包\svn软件总成\tools\sqlite-tools-win32-x86-3360000 的目录D:\安装包\svn软件总成\安装包-----bt lo…...

[激光原理与应用-118]:电源系统的接地详解:小信号的噪声干扰优化,从良好外壳接地开始

目录 一、电路的基本原理&#xff1a;电流回路 1、电流回路的基本概念 2、电流回路的特性 3、电流回路的类型 4、电流回路的应用 五、电流回路的注意事项 二、交流设备的接地 1.1 概述 1、交流工作接地的定义 2、交流工作接地的作用 3、交流工作接地的规范要求 4、…...

回测本身就是一种过度拟合?

这也许是一个絮絮叨叨的专题&#xff0c;跟大伙儿唠一唠量化相关的小问题&#xff0c;有感而发写到哪算哪&#xff0c;这是第一期&#xff0c;先唠个10块钱的~ 前段时间在某乎上看到这样一个问题『您怎么理解回测本身就是一种过度拟合&#xff1f;』 个人看来&#xff0c;回测本…...

什么是Arduino?

Arduino是一款便捷灵活、方便上手的开源电子原型平台&#xff0c;由欧洲的一个开发团队于2005年冬季开发。以下是关于Arduino的详细介绍&#xff1a; 一、基本概述 定义&#xff1a;Arduino是一个基于开放源代码的软硬件平台&#xff0c;它让电子设计更加简单快捷。通过Arduin…...

【机器学习基础】Scikit-learn主要用法

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…...

python-素数回文数的个数(赛氪OJ)

[题目描述] 求 11 到 n 之间&#xff08;包括 n&#xff09;&#xff0c;既是素数又是回文数的整数有多少个。输入&#xff1a; 一个大于 11 小于 10000 的整数 n。输出&#xff1a; 11 到 n 之间的素数回文数个数。样例输入1 23 样例输出1 1 提示&#xff1a; 回文数指左右对…...

OCC 网格化(二)-网格划分算法

目录 一、概述 二、详解 1. 线性偏转 (Linear Deflection) 2. 角偏转 (Angular Deflection) 三、示例 3.1 示例1 3.2 示例2 一、概述 在 Open CASCADE Technology (OCC) 中默认的网格划分算法BRepMesh_IncrementalMesh有两个主要的选项来定义三角剖分—线性和角偏转。 …...

pyecharts模块

PyEcharts 一个基于ECharts库的Python封装库&#xff0c;它使得开发者可以方便地在Python环境中创建交互式的图表&#xff0c;包括折线图、柱状图、饼图、地图等多种可视化效果。 优点&#xff1a; 易用性&#xff1a;PyEcharts提供了简单易懂的API&#xff0c;通过链式调用…...

深⼊理解指针(3)

1. 字符指针变量 2. 数组指针变量 3. ⼆维数组传参的本质 4. 函数指针变量 5. 函数指针数组 6. 转移表 1. 字符指针变量 在指针的类型中我们知道有⼀种指针类型为字符指针 ⼀般使⽤: char* 这两种方式都是把字符串中的首字符的地址赋值给pc。 在这串代码中 str1内容的地…...

黑马头条vue2.0项目实战(四)——首页—文章列表

目录 1. 头部导航栏 1.1 页面布局 1.2 样式调整中遇到的问题 2. 频道列表 2.1 页面布局 2.2 样式调整 2.3 展示频道列表 3. 文章列表 3.1 思路分析 3.2 使用 List 列表组件 3.3 加载文章列表数据 3.4 下拉刷新 3.5 设置上下padding固定头部和频道列表 3.6 记住列…...

UE5.4内容示例(4)UI_UMG - 学习笔记

https://www.unrealengine.com/marketplace/zh-CN/product/content-examples 《内容示例》是学习UE5的基础示例&#xff0c;可以用此熟悉一遍UE5的功能 UI示例 UI_UMG &#xff1a;基本UMGUI_CommonUI &#xff1a;UMG多层应用UI_SlatePostBuffer UI &#xff1a;FX的示例&…...

C#实现数据采集系统-配置文件化

系统优化-配置 配置信息ip端口,还有点位信息,什么的都是直接在代码里直接写死,添加点位,修改配置,比较麻烦,每次修改都需要重新生成打包。 所以将这些配置都改成配置文件,这样只需要修改配置文件,程序无须修改,即可更新。 配置代码: 如果我们有100个采集,一个个去…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...