TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。
此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。
本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。
之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!
2. 代码思路
(1)通过 kafka 消息获取 CDC 消息
(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑
3. 代码实现
3.1 代码结构
cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│ └─ CdcConstants.java├─ dto│ ├─ CdcMessage.java│ └─ User.java├─ job│ ├─ CdcJob.java│ └─ UserCdcJob.java└─ service├─ impl│ └─ UserServiceImpl.java└─ CdcService.java
3.2 CDC 常量类
public class CdcConstants {
public enum MessageType {/*** 插入操作*/INSERT,
/*** 更新操作*/UPDATE,
/*** 删除操作*/DELETE;}
}
3.3 实体类
3.3.1 用户实体类
@Getter
@Setter
public class User {
/*** 用户id*/private Long id;
/*** 用户名*/private String name;
/*** 年龄*/private Integer age;
}
3.3.2 CDC 消息实体类
@Getter
@Setter
public class CdcMessage<T> {
/*** 数据集合*/private List<T> data;
/*** 数据库名称*/private String database;
/*** 是否为DDL语句isDdl*/private boolean isDdl;
/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
/*** sql语句*/private String sql;
/*** 值为int类型*/private T sqlType;
/*** 数据表名*/private String table;
/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}
3.4 任务类
3.4.1 CDC 任务基类
@Slf4j
public class CdcJob<T> {
protected CdcService<T> cdcService;
/*** 处理消息** @param record 消息记录* @param ack 消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}
3.4.2 用户表 CDC 消费任务类
@Component
public class UserCdcJob extends CdcJob<User> {
public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack 消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}
3.5 服务类
3.5.1 CDC 消息处理服务接口
public interface CdcService<T> {
/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}
3.5.2 用户服务实现类
@Service
public class UserServiceImpl implements CdcService<User> {
@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}
4.参考文档
TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview
TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json
相关文章:
TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。…...
SQLite、MySQL、PostgreSQL3个关系数据库之间的对比
引言 关系数据模型以行和列的表格形式组织数据,在数据库管理工具中占主导地位。今天还有其他数据模型,包括NoSQL和NewSQL,但是关系数据库管理系统(RDBMS)仍然占主导地位用于存储和管理全球数据。 本文比较了三种实现最…...
开源容灾备份软件,开源cdp备份软件
数据的安全性和完整性面临着硬件问题、黑客攻击、人为错误等各种威胁。在这种环境下,开源容灾备份软件应运而生,通过提供自动数据备份和恢复,有效地保证了公司的数据安全。 一、开源容灾备份软件的定义和作用 开源容灾备份软件是一种基于开源…...
Java合并区间
问题: 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例: 示例 1ÿ…...
前端面试:【代码质量与工程实践】单元测试、集成测试和持续集成
在现代软件开发中,确保代码质量是至关重要的。单元测试、集成测试和持续集成是关键的工程实践,用于提高代码的可靠性和可维护性。本文将深入探讨这些概念,以及它们如何在软件开发中发挥作用。 1. 单元测试(Unit Testing࿰…...
2023/8/17总结
项目完善: 算法推荐 item-CF 算法推荐我主要写的是协同过滤算法,然后协同过滤算法分成俩种—— 基于用户的 user-CF 基于物品的 item-CF 因为害怕用户冷启动,和数据量的原因 我选择了 item-CF 主要思路是——根据用户的点赞列表&…...
REDIS 7 教程 数据类型-进阶篇
⑥ *位图 bitmap 1. 理论 由0和1 状态表现的二进制位的bit 数组。 说明:用String 类型作为底层数据结构实现的一种统计二值状态的数据类型 位图本质是数组,它是基于String 数据类型的按位操作。该数组由多个二进制位组成,每个二进制位都对应一个偏…...
图文并茂:Python Tkinter从入门到高级实战全解析
目录 介绍什么是Tkinter?准备工作第一个Tkinter程序界面布局事件处理补充知识点 文本输入框复选框和单选框列表框弹出对话框 综合案例:待办事项列表总结 介绍 欢迎来到本篇文章,我们将带您深入了解如何在Python中使用Tkinter库来创建图形用…...
npm和yarn的区别?
文章目录 前言npm和yarn的作用和特点npm和yarn的安装的机制npm安装机制yarn安装机制检测包解析包获取包链接包构建包 总结后言 前言 这一期给大家讲解npm和yarn的一些区别 npm和yarn的作用和特点 包管理:npm 和 yarn 可以用于安装、更新和删除 JavaScript 包。它们提…...
微服务项目容器编排docker-compose.yml、Dockerfile文件模板、相关配置文件、shell脚本
nacos Dockerfile(不需要特殊处理,使用docker conpose可以不写) # 基础镜像 FROM nacos/nacos-server # author MAINTAINER jianglifeng<jlifengfoxmail.com> RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ &&a…...
算法通过村第三关-数组黄金笔记|数组难解
文章目录 前言数组中出现超过一半的数字数组中只出现一次的数字颜色的分类问题(荷兰国旗问题)基于冒泡排序的双指针(快慢指针)基于快排的双指针(对撞指针) 总结 前言 提示:苦不来自外在环境中的人、事、物,…...
【2023】LeetCode HOT 100——矩阵
目录 1. 矩阵置零1.1 C++实现1.2 Python实现1.3 时空分析2. 螺旋矩阵2.1 C++实现2.2 Python实现2.3 时空分析3. 旋转图像3.1 C++实现3.2 Python实现3.3 时空分析4. 搜索二维矩阵 II4.1 C++实现4.2 Python实现4.3 时空分析1. 矩阵置零 🔗 原题链接:...
springboot源码方法
利用LinkedHashSet移除List重复的数据protected final <T> List<T> removeDuplicates(List<T> list) {return new ArrayList<>(new LinkedHashSet<>(list));} SpringFactoriesLoader#loadFactoryNames 加载配置文件...
基于java街球社区网站设计与实现
摘 要 本文主要讲述了基于SpringBootVue模式的街球社区网站的设计与实现。这里所谓的街球社区网站是通过类似于百度贴吧之类的网上论坛使得所有的街球爱好者有一个可以互相交流的平台,并使所有用户可以在社区进行教学视频的观看以及相关体育运动产品的选购,平台的盈利主要靠…...
定时产生不同频率方波
/*----------------------------------------------- 内容:通过定时产生不同频率方波 ------------------------------------------------*/ #include<reg52.h> //包含头文件,一般情况不需要改动,头文件包含特殊功能寄存器的定义 /*-…...
Java“牵手”天猫商品sku信息API接口数据,天猫API接口申请指南
天猫平台商品sku属性信息接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、月销量、总销量、库存、详情描述、图片等详细信息 。 获取商品销量接口API是一种用于获取电商平台上商品sku属性数据的接口&#…...
【⑮MySQL | 视图】概述 | 创建 | 查看 | 更新 | 修改 | 删除
前言 ✨欢迎来到小K的MySQL专栏,本节将为大家带来MySQL视图概述 | 创建 | 查看 | 更新 | 修改 | 删除的分享✨ 目录 前言1.视图概述2.创建视图3.查看视图4.更新视图数据5.修改视图6.删除视图总结 1.视图概述 1.1 为什么使用视图? 视图一方面可以帮我们使…...
Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用
1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录,新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项,这Kconfig是在字符设备下的。 config HELLOtrist…...
enable_shared_from_this
用途: enable_shared_from_this 是一个基类模板,用于解决在类成员函数中获取类对象的 shared_ptr 的需求。它提供了一种机制,使类能够安全地从成员函数内部获得指向自身的 shared_ptr。 解决对象生命周期管理问题:在某些情况下&…...
weak_ptr是怎么探知对象生死的
weak_ptr是C智能指针中的一种。它用于解决共享所有权的问题,并且可以避免因循环引用而导致的内存泄漏。 weak_ptr本身并不承担对象的所有权,它指向由shared_ptr管理的对象。与shared_ptr不同,weak_ptr并不会增加计数器来计算对象的引用次数。…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
前端开发者常用网站
Can I use网站:一个查询网页技术兼容性的网站 一个查询网页技术兼容性的网站Can I use:Can I use... Support tables for HTML5, CSS3, etc (查询浏览器对HTML5的支持情况) 权威网站:MDN JavaScript权威网站:JavaScript | MDN...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...
算法—栈系列
一:删除字符串中的所有相邻重复项 class Solution { public:string removeDuplicates(string s) {stack<char> st;for(int i 0; i < s.size(); i){char target s[i];if(!st.empty() && target st.top())st.pop();elsest.push(s[i]);}string ret…...
基于Uniapp的HarmonyOS 5.0体育应用开发攻略
一、技术架构设计 1.混合开发框架选型 (1)使用Uniapp 3.8版本支持ArkTS编译 (2)通过uni-harmony插件调用原生能力 (3)分层架构设计: graph TDA[UI层] -->|Vue语法| B(Uniapp框架)B --&g…...
Easy Excel
Easy Excel 一、依赖引入二、基本使用1. 定义实体类(导入/导出共用)2. 写 Excel3. 读 Excel 三、常用注解说明(完整列表)四、进阶:自定义转换器(Converter) 其它自定义转换器没生效 Easy Excel在…...
