TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。
此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。
本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。
之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!
2. 代码思路
(1)通过 kafka 消息获取 CDC 消息
(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑
3. 代码实现
3.1 代码结构
cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│ └─ CdcConstants.java├─ dto│ ├─ CdcMessage.java│ └─ User.java├─ job│ ├─ CdcJob.java│ └─ UserCdcJob.java└─ service├─ impl│ └─ UserServiceImpl.java└─ CdcService.java
3.2 CDC 常量类
public class CdcConstants {
public enum MessageType {/*** 插入操作*/INSERT,
/*** 更新操作*/UPDATE,
/*** 删除操作*/DELETE;}
}
3.3 实体类
3.3.1 用户实体类
@Getter
@Setter
public class User {
/*** 用户id*/private Long id;
/*** 用户名*/private String name;
/*** 年龄*/private Integer age;
}
3.3.2 CDC 消息实体类
@Getter
@Setter
public class CdcMessage<T> {
/*** 数据集合*/private List<T> data;
/*** 数据库名称*/private String database;
/*** 是否为DDL语句isDdl*/private boolean isDdl;
/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
/*** sql语句*/private String sql;
/*** 值为int类型*/private T sqlType;
/*** 数据表名*/private String table;
/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}
3.4 任务类
3.4.1 CDC 任务基类
@Slf4j
public class CdcJob<T> {
protected CdcService<T> cdcService;
/*** 处理消息** @param record 消息记录* @param ack 消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}
3.4.2 用户表 CDC 消费任务类
@Component
public class UserCdcJob extends CdcJob<User> {
public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack 消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}
3.5 服务类
3.5.1 CDC 消息处理服务接口
public interface CdcService<T> {
/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}
3.5.2 用户服务实现类
@Service
public class UserServiceImpl implements CdcService<User> {
@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}
4.参考文档
TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview
TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json
相关文章:
TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。…...

SQLite、MySQL、PostgreSQL3个关系数据库之间的对比
引言 关系数据模型以行和列的表格形式组织数据,在数据库管理工具中占主导地位。今天还有其他数据模型,包括NoSQL和NewSQL,但是关系数据库管理系统(RDBMS)仍然占主导地位用于存储和管理全球数据。 本文比较了三种实现最…...

开源容灾备份软件,开源cdp备份软件
数据的安全性和完整性面临着硬件问题、黑客攻击、人为错误等各种威胁。在这种环境下,开源容灾备份软件应运而生,通过提供自动数据备份和恢复,有效地保证了公司的数据安全。 一、开源容灾备份软件的定义和作用 开源容灾备份软件是一种基于开源…...
Java合并区间
问题: 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例: 示例 1ÿ…...
前端面试:【代码质量与工程实践】单元测试、集成测试和持续集成
在现代软件开发中,确保代码质量是至关重要的。单元测试、集成测试和持续集成是关键的工程实践,用于提高代码的可靠性和可维护性。本文将深入探讨这些概念,以及它们如何在软件开发中发挥作用。 1. 单元测试(Unit Testing࿰…...

2023/8/17总结
项目完善: 算法推荐 item-CF 算法推荐我主要写的是协同过滤算法,然后协同过滤算法分成俩种—— 基于用户的 user-CF 基于物品的 item-CF 因为害怕用户冷启动,和数据量的原因 我选择了 item-CF 主要思路是——根据用户的点赞列表&…...

REDIS 7 教程 数据类型-进阶篇
⑥ *位图 bitmap 1. 理论 由0和1 状态表现的二进制位的bit 数组。 说明:用String 类型作为底层数据结构实现的一种统计二值状态的数据类型 位图本质是数组,它是基于String 数据类型的按位操作。该数组由多个二进制位组成,每个二进制位都对应一个偏…...

图文并茂:Python Tkinter从入门到高级实战全解析
目录 介绍什么是Tkinter?准备工作第一个Tkinter程序界面布局事件处理补充知识点 文本输入框复选框和单选框列表框弹出对话框 综合案例:待办事项列表总结 介绍 欢迎来到本篇文章,我们将带您深入了解如何在Python中使用Tkinter库来创建图形用…...

npm和yarn的区别?
文章目录 前言npm和yarn的作用和特点npm和yarn的安装的机制npm安装机制yarn安装机制检测包解析包获取包链接包构建包 总结后言 前言 这一期给大家讲解npm和yarn的一些区别 npm和yarn的作用和特点 包管理:npm 和 yarn 可以用于安装、更新和删除 JavaScript 包。它们提…...
微服务项目容器编排docker-compose.yml、Dockerfile文件模板、相关配置文件、shell脚本
nacos Dockerfile(不需要特殊处理,使用docker conpose可以不写) # 基础镜像 FROM nacos/nacos-server # author MAINTAINER jianglifeng<jlifengfoxmail.com> RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ &&a…...

算法通过村第三关-数组黄金笔记|数组难解
文章目录 前言数组中出现超过一半的数字数组中只出现一次的数字颜色的分类问题(荷兰国旗问题)基于冒泡排序的双指针(快慢指针)基于快排的双指针(对撞指针) 总结 前言 提示:苦不来自外在环境中的人、事、物,…...
【2023】LeetCode HOT 100——矩阵
目录 1. 矩阵置零1.1 C++实现1.2 Python实现1.3 时空分析2. 螺旋矩阵2.1 C++实现2.2 Python实现2.3 时空分析3. 旋转图像3.1 C++实现3.2 Python实现3.3 时空分析4. 搜索二维矩阵 II4.1 C++实现4.2 Python实现4.3 时空分析1. 矩阵置零 🔗 原题链接:...

springboot源码方法
利用LinkedHashSet移除List重复的数据protected final <T> List<T> removeDuplicates(List<T> list) {return new ArrayList<>(new LinkedHashSet<>(list));} SpringFactoriesLoader#loadFactoryNames 加载配置文件...
基于java街球社区网站设计与实现
摘 要 本文主要讲述了基于SpringBootVue模式的街球社区网站的设计与实现。这里所谓的街球社区网站是通过类似于百度贴吧之类的网上论坛使得所有的街球爱好者有一个可以互相交流的平台,并使所有用户可以在社区进行教学视频的观看以及相关体育运动产品的选购,平台的盈利主要靠…...
定时产生不同频率方波
/*----------------------------------------------- 内容:通过定时产生不同频率方波 ------------------------------------------------*/ #include<reg52.h> //包含头文件,一般情况不需要改动,头文件包含特殊功能寄存器的定义 /*-…...

Java“牵手”天猫商品sku信息API接口数据,天猫API接口申请指南
天猫平台商品sku属性信息接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、月销量、总销量、库存、详情描述、图片等详细信息 。 获取商品销量接口API是一种用于获取电商平台上商品sku属性数据的接口&#…...

【⑮MySQL | 视图】概述 | 创建 | 查看 | 更新 | 修改 | 删除
前言 ✨欢迎来到小K的MySQL专栏,本节将为大家带来MySQL视图概述 | 创建 | 查看 | 更新 | 修改 | 删除的分享✨ 目录 前言1.视图概述2.创建视图3.查看视图4.更新视图数据5.修改视图6.删除视图总结 1.视图概述 1.1 为什么使用视图? 视图一方面可以帮我们使…...

Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用
1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录,新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项,这Kconfig是在字符设备下的。 config HELLOtrist…...
enable_shared_from_this
用途: enable_shared_from_this 是一个基类模板,用于解决在类成员函数中获取类对象的 shared_ptr 的需求。它提供了一种机制,使类能够安全地从成员函数内部获得指向自身的 shared_ptr。 解决对象生命周期管理问题:在某些情况下&…...

weak_ptr是怎么探知对象生死的
weak_ptr是C智能指针中的一种。它用于解决共享所有权的问题,并且可以避免因循环引用而导致的内存泄漏。 weak_ptr本身并不承担对象的所有权,它指向由shared_ptr管理的对象。与shared_ptr不同,weak_ptr并不会增加计数器来计算对象的引用次数。…...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
ffmpeg(四):滤镜命令
FFmpeg 的滤镜命令是用于音视频处理中的强大工具,可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下: ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜: ffmpeg…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...

USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...