当前位置: 首页 > news >正文

TiCDC Canal-JSON 消息接收示例(Java 版)

1.引言

业务程序经常会通过各式各样的缓存来提升用户的访问速度。

由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。

如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。

此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。

本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。

之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!

2. 代码思路

(1)通过 kafka 消息获取 CDC 消息

(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑

3. 代码实现

3.1 代码结构

cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│    └─ CdcConstants.java├─ dto│    ├─ CdcMessage.java│    └─ User.java├─ job│    ├─ CdcJob.java│    └─ UserCdcJob.java└─ service├─ impl│   └─ UserServiceImpl.java└─ CdcService.java

3.2 CDC 常量类

public class CdcConstants {
​public enum MessageType {/*** 插入操作*/INSERT,
​/*** 更新操作*/UPDATE,
​/*** 删除操作*/DELETE;}
}

3.3 实体类

3.3.1 用户实体类

@Getter
@Setter
public class User {
​/*** 用户id*/private Long id;
​/*** 用户名*/private String name;
​/*** 年龄*/private Integer age;
}

3.3.2 CDC 消息实体类

@Getter
@Setter
public class CdcMessage<T> {
​/*** 数据集合*/private List<T> data;
​/*** 数据库名称*/private String database;
​/*** 是否为DDL语句isDdl*/private boolean isDdl;
​/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
​/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
​/*** sql语句*/private String sql;
​/*** 值为int类型*/private T sqlType;
​/*** 数据表名*/private String table;
​/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}

3.4 任务类

3.4.1 CDC 任务基类

@Slf4j
public class CdcJob<T> {
​protected CdcService<T> cdcService;
​/*** 处理消息** @param record 消息记录* @param ack    消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
​/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
​// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
​// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}

3.4.2 用户表 CDC 消费任务类

@Component
public class UserCdcJob extends CdcJob<User> {
​public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
​/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack    消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}

3.5 服务类

3.5.1 CDC 消息处理服务接口

public interface CdcService<T> {
​/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
​/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
​/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}

3.5.2 用户服务实现类

@Service
public class UserServiceImpl implements CdcService<User> {
​@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
​@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
​@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}

4.参考文档

TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview

TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json

相关文章:

TiCDC Canal-JSON 消息接收示例(Java 版)

1.引言 业务程序经常会通过各式各样的缓存来提升用户的访问速度。 由于存在缓存&#xff0c;在一些实时性要求较高的场景中&#xff0c;需要在数据变更的同时将数据缓存进行更新或删除。 如果数据本身由其他业务部门提供&#xff0c;就无法在写入的同时做缓存的一致性处理。…...

SQLite、MySQL、PostgreSQL3个关系数据库之间的对比

引言 关系数据模型以行和列的表格形式组织数据&#xff0c;在数据库管理工具中占主导地位。今天还有其他数据模型&#xff0c;包括NoSQL和NewSQL&#xff0c;但是关系数据库管理系统&#xff08;RDBMS&#xff09;仍然占主导地位用于存储和管理全球数据。 本文比较了三种实现最…...

开源容灾备份软件,开源cdp备份软件

数据的安全性和完整性面临着硬件问题、黑客攻击、人为错误等各种威胁。在这种环境下&#xff0c;开源容灾备份软件应运而生&#xff0c;通过提供自动数据备份和恢复&#xff0c;有效地保证了公司的数据安全。 一、开源容灾备份软件的定义和作用 开源容灾备份软件是一种基于开源…...

Java合并区间

问题&#xff1a; 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例&#xff1a; 示例 1&#xff…...

前端面试:【代码质量与工程实践】单元测试、集成测试和持续集成

在现代软件开发中&#xff0c;确保代码质量是至关重要的。单元测试、集成测试和持续集成是关键的工程实践&#xff0c;用于提高代码的可靠性和可维护性。本文将深入探讨这些概念&#xff0c;以及它们如何在软件开发中发挥作用。 1. 单元测试&#xff08;Unit Testing&#xff0…...

2023/8/17总结

项目完善&#xff1a; 算法推荐 item-CF 算法推荐我主要写的是协同过滤算法&#xff0c;然后协同过滤算法分成俩种—— 基于用户的 user-CF 基于物品的 item-CF 因为害怕用户冷启动&#xff0c;和数据量的原因 我选择了 item-CF 主要思路是——根据用户的点赞列表&…...

REDIS 7 教程 数据类型-进阶篇

⑥ *位图 bitmap 1. 理论 由0和1 状态表现的二进制位的bit 数组。 说明:用String 类型作为底层数据结构实现的一种统计二值状态的数据类型 位图本质是数组,它是基于String 数据类型的按位操作。该数组由多个二进制位组成,每个二进制位都对应一个偏…...

图文并茂:Python Tkinter从入门到高级实战全解析

目录 介绍什么是Tkinter&#xff1f;准备工作第一个Tkinter程序界面布局事件处理补充知识点 文本输入框复选框和单选框列表框弹出对话框 综合案例&#xff1a;待办事项列表总结 介绍 欢迎来到本篇文章&#xff0c;我们将带您深入了解如何在Python中使用Tkinter库来创建图形用…...

npm和yarn的区别?

文章目录 前言npm和yarn的作用和特点npm和yarn的安装的机制npm安装机制yarn安装机制检测包解析包获取包链接包构建包 总结后言 前言 这一期给大家讲解npm和yarn的一些区别 npm和yarn的作用和特点 包管理&#xff1a;npm 和 yarn 可以用于安装、更新和删除 JavaScript 包。它们提…...

微服务项目容器编排docker-compose.yml、Dockerfile文件模板、相关配置文件、shell脚本

nacos Dockerfile&#xff08;不需要特殊处理&#xff0c;使用docker conpose可以不写&#xff09; # 基础镜像 FROM nacos/nacos-server # author MAINTAINER jianglifeng<jlifengfoxmail.com> RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ &&a…...

算法通过村第三关-数组黄金笔记|数组难解

文章目录 前言数组中出现超过一半的数字数组中只出现一次的数字颜色的分类问题(荷兰国旗问题)基于冒泡排序的双指针&#xff08;快慢指针&#xff09;基于快排的双指针&#xff08;对撞指针&#xff09; 总结 前言 提示&#xff1a;苦不来自外在环境中的人、事、物&#xff0c;…...

【2023】LeetCode HOT 100——矩阵

目录 1. 矩阵置零1.1 C++实现1.2 Python实现1.3 时空分析2. 螺旋矩阵2.1 C++实现2.2 Python实现2.3 时空分析3. 旋转图像3.1 C++实现3.2 Python实现3.3 时空分析4. 搜索二维矩阵 II4.1 C++实现4.2 Python实现4.3 时空分析1. 矩阵置零 🔗 原题链接:...

springboot源码方法

利用LinkedHashSet移除List重复的数据protected final <T> List<T> removeDuplicates(List<T> list) {return new ArrayList<>(new LinkedHashSet<>(list));} SpringFactoriesLoader#loadFactoryNames 加载配置文件...

基于java街球社区网站设计与实现

摘  要 本文主要讲述了基于SpringBootVue模式的街球社区网站的设计与实现。这里所谓的街球社区网站是通过类似于百度贴吧之类的网上论坛使得所有的街球爱好者有一个可以互相交流的平台,并使所有用户可以在社区进行教学视频的观看以及相关体育运动产品的选购,平台的盈利主要靠…...

定时产生不同频率方波

/*----------------------------------------------- 内容&#xff1a;通过定时产生不同频率方波 ------------------------------------------------*/ #include<reg52.h> //包含头文件&#xff0c;一般情况不需要改动&#xff0c;头文件包含特殊功能寄存器的定义 /*-…...

Java“牵手”天猫商品sku信息API接口数据,天猫API接口申请指南

天猫平台商品sku属性信息接口是开放平台提供的一种API接口&#xff0c;通过调用API接口&#xff0c;开发者可以获取天猫商品的标题、价格、库存、月销量、总销量、库存、详情描述、图片等详细信息 。 获取商品销量接口API是一种用于获取电商平台上商品sku属性数据的接口&#…...

【⑮MySQL | 视图】概述 | 创建 | 查看 | 更新 | 修改 | 删除

前言 ✨欢迎来到小K的MySQL专栏&#xff0c;本节将为大家带来MySQL视图概述 | 创建 | 查看 | 更新 | 修改 | 删除的分享✨ 目录 前言1.视图概述2.创建视图3.查看视图4.更新视图数据5.修改视图6.删除视图总结 1.视图概述 1.1 为什么使用视图&#xff1f; 视图一方面可以帮我们使…...

Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用

1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录&#xff0c;新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项&#xff0c;这Kconfig是在字符设备下的。 config HELLOtrist…...

enable_shared_from_this

用途&#xff1a; enable_shared_from_this 是一个基类模板&#xff0c;用于解决在类成员函数中获取类对象的 shared_ptr 的需求。它提供了一种机制&#xff0c;使类能够安全地从成员函数内部获得指向自身的 shared_ptr。 解决对象生命周期管理问题&#xff1a;在某些情况下&…...

weak_ptr是怎么探知对象生死的

weak_ptr是C智能指针中的一种。它用于解决共享所有权的问题&#xff0c;并且可以避免因循环引用而导致的内存泄漏。 weak_ptr本身并不承担对象的所有权&#xff0c;它指向由shared_ptr管理的对象。与shared_ptr不同&#xff0c;weak_ptr并不会增加计数器来计算对象的引用次数。…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

Mysql中select查询语句的执行过程

目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析&#xff08;Parser&#xff09; 2.4、执行sql 1. 预处理&#xff08;Preprocessor&#xff09; 2. 查询优化器&#xff08;Optimizer&#xff09; 3. 执行器…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...