TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。
此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。
本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。
之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!
2. 代码思路
(1)通过 kafka 消息获取 CDC 消息
(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑
3. 代码实现
3.1 代码结构
cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│ └─ CdcConstants.java├─ dto│ ├─ CdcMessage.java│ └─ User.java├─ job│ ├─ CdcJob.java│ └─ UserCdcJob.java└─ service├─ impl│ └─ UserServiceImpl.java└─ CdcService.java
3.2 CDC 常量类
public class CdcConstants {
public enum MessageType {/*** 插入操作*/INSERT,
/*** 更新操作*/UPDATE,
/*** 删除操作*/DELETE;}
}
3.3 实体类
3.3.1 用户实体类
@Getter
@Setter
public class User {
/*** 用户id*/private Long id;
/*** 用户名*/private String name;
/*** 年龄*/private Integer age;
}
3.3.2 CDC 消息实体类
@Getter
@Setter
public class CdcMessage<T> {
/*** 数据集合*/private List<T> data;
/*** 数据库名称*/private String database;
/*** 是否为DDL语句isDdl*/private boolean isDdl;
/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
/*** sql语句*/private String sql;
/*** 值为int类型*/private T sqlType;
/*** 数据表名*/private String table;
/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}
3.4 任务类
3.4.1 CDC 任务基类
@Slf4j
public class CdcJob<T> {
protected CdcService<T> cdcService;
/*** 处理消息** @param record 消息记录* @param ack 消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}
3.4.2 用户表 CDC 消费任务类
@Component
public class UserCdcJob extends CdcJob<User> {
public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack 消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}
3.5 服务类
3.5.1 CDC 消息处理服务接口
public interface CdcService<T> {
/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}
3.5.2 用户服务实现类
@Service
public class UserServiceImpl implements CdcService<User> {
@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}
4.参考文档
TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview
TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json
相关文章:
TiCDC Canal-JSON 消息接收示例(Java 版)
1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。…...

SQLite、MySQL、PostgreSQL3个关系数据库之间的对比
引言 关系数据模型以行和列的表格形式组织数据,在数据库管理工具中占主导地位。今天还有其他数据模型,包括NoSQL和NewSQL,但是关系数据库管理系统(RDBMS)仍然占主导地位用于存储和管理全球数据。 本文比较了三种实现最…...

开源容灾备份软件,开源cdp备份软件
数据的安全性和完整性面临着硬件问题、黑客攻击、人为错误等各种威胁。在这种环境下,开源容灾备份软件应运而生,通过提供自动数据备份和恢复,有效地保证了公司的数据安全。 一、开源容灾备份软件的定义和作用 开源容灾备份软件是一种基于开源…...
Java合并区间
问题: 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例: 示例 1ÿ…...
前端面试:【代码质量与工程实践】单元测试、集成测试和持续集成
在现代软件开发中,确保代码质量是至关重要的。单元测试、集成测试和持续集成是关键的工程实践,用于提高代码的可靠性和可维护性。本文将深入探讨这些概念,以及它们如何在软件开发中发挥作用。 1. 单元测试(Unit Testing࿰…...

2023/8/17总结
项目完善: 算法推荐 item-CF 算法推荐我主要写的是协同过滤算法,然后协同过滤算法分成俩种—— 基于用户的 user-CF 基于物品的 item-CF 因为害怕用户冷启动,和数据量的原因 我选择了 item-CF 主要思路是——根据用户的点赞列表&…...

REDIS 7 教程 数据类型-进阶篇
⑥ *位图 bitmap 1. 理论 由0和1 状态表现的二进制位的bit 数组。 说明:用String 类型作为底层数据结构实现的一种统计二值状态的数据类型 位图本质是数组,它是基于String 数据类型的按位操作。该数组由多个二进制位组成,每个二进制位都对应一个偏…...

图文并茂:Python Tkinter从入门到高级实战全解析
目录 介绍什么是Tkinter?准备工作第一个Tkinter程序界面布局事件处理补充知识点 文本输入框复选框和单选框列表框弹出对话框 综合案例:待办事项列表总结 介绍 欢迎来到本篇文章,我们将带您深入了解如何在Python中使用Tkinter库来创建图形用…...

npm和yarn的区别?
文章目录 前言npm和yarn的作用和特点npm和yarn的安装的机制npm安装机制yarn安装机制检测包解析包获取包链接包构建包 总结后言 前言 这一期给大家讲解npm和yarn的一些区别 npm和yarn的作用和特点 包管理:npm 和 yarn 可以用于安装、更新和删除 JavaScript 包。它们提…...
微服务项目容器编排docker-compose.yml、Dockerfile文件模板、相关配置文件、shell脚本
nacos Dockerfile(不需要特殊处理,使用docker conpose可以不写) # 基础镜像 FROM nacos/nacos-server # author MAINTAINER jianglifeng<jlifengfoxmail.com> RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ &&a…...

算法通过村第三关-数组黄金笔记|数组难解
文章目录 前言数组中出现超过一半的数字数组中只出现一次的数字颜色的分类问题(荷兰国旗问题)基于冒泡排序的双指针(快慢指针)基于快排的双指针(对撞指针) 总结 前言 提示:苦不来自外在环境中的人、事、物,…...
【2023】LeetCode HOT 100——矩阵
目录 1. 矩阵置零1.1 C++实现1.2 Python实现1.3 时空分析2. 螺旋矩阵2.1 C++实现2.2 Python实现2.3 时空分析3. 旋转图像3.1 C++实现3.2 Python实现3.3 时空分析4. 搜索二维矩阵 II4.1 C++实现4.2 Python实现4.3 时空分析1. 矩阵置零 🔗 原题链接:...

springboot源码方法
利用LinkedHashSet移除List重复的数据protected final <T> List<T> removeDuplicates(List<T> list) {return new ArrayList<>(new LinkedHashSet<>(list));} SpringFactoriesLoader#loadFactoryNames 加载配置文件...
基于java街球社区网站设计与实现
摘 要 本文主要讲述了基于SpringBootVue模式的街球社区网站的设计与实现。这里所谓的街球社区网站是通过类似于百度贴吧之类的网上论坛使得所有的街球爱好者有一个可以互相交流的平台,并使所有用户可以在社区进行教学视频的观看以及相关体育运动产品的选购,平台的盈利主要靠…...
定时产生不同频率方波
/*----------------------------------------------- 内容:通过定时产生不同频率方波 ------------------------------------------------*/ #include<reg52.h> //包含头文件,一般情况不需要改动,头文件包含特殊功能寄存器的定义 /*-…...

Java“牵手”天猫商品sku信息API接口数据,天猫API接口申请指南
天猫平台商品sku属性信息接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、月销量、总销量、库存、详情描述、图片等详细信息 。 获取商品销量接口API是一种用于获取电商平台上商品sku属性数据的接口&#…...

【⑮MySQL | 视图】概述 | 创建 | 查看 | 更新 | 修改 | 删除
前言 ✨欢迎来到小K的MySQL专栏,本节将为大家带来MySQL视图概述 | 创建 | 查看 | 更新 | 修改 | 删除的分享✨ 目录 前言1.视图概述2.创建视图3.查看视图4.更新视图数据5.修改视图6.删除视图总结 1.视图概述 1.1 为什么使用视图? 视图一方面可以帮我们使…...

Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用
1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录,新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项,这Kconfig是在字符设备下的。 config HELLOtrist…...
enable_shared_from_this
用途: enable_shared_from_this 是一个基类模板,用于解决在类成员函数中获取类对象的 shared_ptr 的需求。它提供了一种机制,使类能够安全地从成员函数内部获得指向自身的 shared_ptr。 解决对象生命周期管理问题:在某些情况下&…...

weak_ptr是怎么探知对象生死的
weak_ptr是C智能指针中的一种。它用于解决共享所有权的问题,并且可以避免因循环引用而导致的内存泄漏。 weak_ptr本身并不承担对象的所有权,它指向由shared_ptr管理的对象。与shared_ptr不同,weak_ptr并不会增加计数器来计算对象的引用次数。…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...

搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

实战三:开发网页端界面完成黑白视频转为彩色视频
一、需求描述 设计一个简单的视频上色应用,用户可以通过网页界面上传黑白视频,系统会自动将其转换为彩色视频。整个过程对用户来说非常简单直观,不需要了解技术细节。 效果图 二、实现思路 总体思路: 用户通过Gradio界面上…...
LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)
在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…...

鸿蒙Navigation路由导航-基本使用介绍
1. Navigation介绍 Navigation组件是路由导航的根视图容器,一般作为Page页面的根容器使用,其内部默认包含了标题栏、内容区和工具栏,其中内容区默认首页显示导航内容(Navigation的子组件)或非首页显示(Nav…...
Vue 实例的数据对象详解
Vue 实例的数据对象详解 在 Vue 中,数据对象是响应式系统的核心,也是组件状态的载体。理解数据对象的原理和使用方式是成为 Vue 专家的关键一步。我将从多个维度深入剖析 Vue 实例的数据对象。 一、数据对象的定义方式 1. Options API 中的定义 在 Options API 中,使用 …...